#!/usr/bin/env python """ 1. Import a movie_list txt file 2. Query IMDb for each entry, retrieving actual movie name, rating and genres 3. Generate an HTML table from the IMDb data 4. Store the HTML in index.html """ import os import sys import time import threading from pathlib import Path import progressbar from imdb import IMDb from imdb._exceptions import IMDbParserError, IMDbDataAccessError class MovieList: """ Class to generate a movie list HTML table """ def __init__(self, src=None, dst=None): self.prev_html = [] self.html = """ My Movie List

🎬 My Movie Collection

""" self.src = src self.dst = Path(dst) if dst else Path(os.path.dirname(sys.argv[0])) / 'index.html' self.movie_list = [] self.threads = [] self.read_prev_output() self.html_table = None def _worker(self, arg, index): # Scan IMDb for a given movie and append it to the html # This collects rating, genres, official name and a hyperlink imdb = IMDb() first_run = True while True: if not first_run: time.sleep(10) else: first_run = False try: query = imdb.search_movie(f'{arg["title"]} {arg["year"]}') break except IMDbDataAccessError as imdb_data_exc: exc = str(imdb_data_exc) if '503' in exc: sys.stderr.write('503 - Service Unavailable, retrying...') elif '403' in exc: sys.stderr.write('403 - Forbidden, retrying...\n') query = [] time.sleep(10) except IMDbParserError as imdb_parser_exc: query = [] break except Exception as exc: time.sleep(10) movie = None for entry in query: try: imdb.update(entry) except Exception as e: sys.stderr.write('update err') # in case any of these keys is missing in the query, continue if not all(key in entry.keys() for key in ['kind', 'year', 'title']): continue if arg['status'] == 'DONE' and 'rating' not in entry.keys(): continue # Try to eliminate episode results if [i for i in entry.keys() if 'episode' in i.lower()] or ( 'episode' in entry['title'].lower() and \ 'episode' not in arg['title'].lower()): continue if entry['kind'].lower() == arg['kind'].lower(): movie = entry break if not movie: movie = { 'title': arg['title'], 'kind': arg['kind'], 'year': arg['year'], 'dummy': None } if 'genres' not in movie.keys(): movie['genres'] = ['N/A'] if 'rating' not in movie.keys(): movie['rating'] = 'N/A' html_title_td = movie['title'] if 'dummy' in movie.keys() else \ f'{movie["title"]}' self.html_table[index] = ( f'\n ' f'' f'' f'' f'' f'' f'' f'' ) def gen(self): """ Generate an HTML list based on input, using a threaded worker """ if not self.src: self.src = Path(os.path.dirname(sys.argv[0])) / 'movie_list' else: self.src = Path(self.src) if not self.src.exists(): sys.stderr.write(f'error: input does not exist - {self.src}\n') return False self.movie_list = {} seen_titles = set() # Track unique titles # Open the movie list & split the columns with open(self.src, 'r', encoding='utf-8') as fp_handle: mlist_raw = fp_handle.read() idx = 0 for raw_line in mlist_raw.splitlines(): # In case the line is empty if not raw_line: continue title = raw_line[0:next((i for i, ch in enumerate(raw_line) if ch in {'<', '('}), None) - 1] # Skip if we've already seen this title if title in seen_titles: continue seen_titles.add(title) self.movie_list.update({ idx: { 'title': title, 'kind': raw_line[raw_line.find('<')+1:raw_line.rfind('>')+1].strip('<>') or 'movie', 'year': raw_line[raw_line.find('(')+1:raw_line.find(')')], 'status': raw_line[raw_line.find('[')+1:raw_line.find(']')], } }) idx += 1 self.html_table = [None] * len(self.movie_list) # Progress bar pbar = progressbar.ProgressBar(max_value=len(self.movie_list)) for idx, movie in self.movie_list.items(): # More precise matching - look for the hidden

tag with exact title match = [html_row for html_row in self.prev_html if f'

' in html_row and 'N/A' not in html_row] if match: # Update the index and status from the cached row match_str = match[0] # Replace the status (* -> DONE or vice versa) match_str = match_str.replace('*', movie['status']).replace('DONE', movie['status']) # Update the index number if '', 1)[1] if '' in match_str else match_str self.html_table[idx] = f'\n {after_index}' else: self.html_table[idx] = match_str pbar.increment() else: thread = threading.Thread(target=self._worker, args=(movie, idx)) self.threads.append(thread) max_threads = 10 while self.threads: threads_alive = self.get_alive_threads() threads_to_be_started = [i for i in self.threads if i not in threads_alive] for idx in range(max_threads if max_threads < len(threads_to_be_started) else len(threads_to_be_started)): threads_to_be_started[idx].start() pbar.increment() time.sleep(2) time.sleep(2) self.delete_finished_threads() # Don't append to self.html here - we'll do it in write() return True def delete_finished_threads(self): for idx, thread in enumerate(self.threads): if not thread.is_alive() and thread._started.is_set(): thread.join() self.threads[idx] = None self.threads = list(filter(lambda a: a is not None, self.threads)) def get_alive_threads(self): threads = [] for thread in self.threads: if thread.is_alive() or thread._started.is_set(): threads.append(thread) return threads def write(self, dst=None): """ Write the HTML list to index.html """ out_path = dst if dst else self.dst timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) # Build the final HTML - don't append, rebuild from scratch final_html = self.html.split('')[0] + '' final_html += ''.join([row for row in self.html_table if row is not None]) final_html += f'''
# Title Year Rating Genre Status
{index + 1}{html_title_td}{movie["year"]}{movie["rating"]}{", ".join(movie["genres"])}{arg["status"]}
' in match_str: # Extract everything after the index cell after_index = match_str.split('
{idx + 1}
Generated {timestamp} UTC
''' with open(out_path, 'wb') as fp_handle: fp_handle.write(final_html.encode('utf8')) def read_prev_output(self): """ Import a previous HTML table """ if self.dst.exists(): with open(self.dst, 'rb') as fp_handle: self.prev_html = fp_handle.read().decode('utf8').split('\n') def deduplicate_html(self): """ Remove duplicate entries from html_table based on movie titles """ seen_titles = set() deduplicated = [] for idx, row in enumerate(self.html_table): if row is None: continue # Extract the hidden title from the row if '' in row: start = row.find('', start) title = row[start:end] if title not in seen_titles: seen_titles.add(title) deduplicated.append(row) else: # Skip duplicate continue else: # If we can't find the hidden title, keep the row anyway deduplicated.append(row) # Update html_table with deduplicated content self.html_table = deduplicated return len(self.html_table) def main(): """ Default run """ src = dst = None if len(sys.argv) > 3: sys.stderr.write(f'error: max 2 variables, {len(sys.argv)-1} given!\n') sys.exit(1) if len(sys.argv) > 1: src = sys.argv[1] if len(sys.argv) == 3: dst = sys.argv[2] mlist = MovieList(src=src, dst=dst) if mlist.gen(): mlist.write(dst=dst) if __name__ == "__main__": main()