#!/usr/bin/env python """ 1. Import a movie_list txt file 2. Query IMDb for each entry, retrieving actual movie name, rating and genres 3. Generate an HTML table from the IMDb data 4. Store the HTML in index.html """ import os import sys import time import threading from pathlib import Path import progressbar from imdb import IMDb from imdb._exceptions import IMDbParserError class MovieList: """ Class to generate a movie list HTML table """ def __init__(self, src=None, dst=None): self.prev_html = [] self.html = """ My Movie List
""" self.src = src self.dst = Path(dst) if dst else Path(os.path.dirname(sys.argv[0])) / 'index.html' self.movie_list = [] self.threads = [] self.read_prev_output() self.html_table = None def _worker(self, arg, index): # Scan IMDb for a given movie and append it to the html # This collects rating, genres, official name and a hyperlink imdb = IMDb() while True: try: query = imdb.search_movie(f'{arg["title"]} {arg["year"]}') break except IMDbParserError as exc: query = [] #print(exc) break except Exception as exc: #print(f'error: {exc.__class__.__name__}: {arg["title"]}') time.sleep(10) movie = None for entry in query: #print(entry) imdb.update(entry) # in case any of these keys is missing in the query, continue if not all(key in entry.keys() for key in ['kind', 'year', 'title']): #print(f'missing key {entry.keys()}') continue if arg['status'] == 'DONE' and 'rating' not in entry.keys(): continue # Try to eliminate episode results # Must not have "episode" in the object keys # Must not have "episode" in the query title key, # unless "episode" is in the query search string if [i for i in entry.keys() if 'episode' in i.lower()] or ( 'episode' in entry['title'].lower() and \ 'episode' not in arg['title'].lower()): continue if entry['kind'].lower() == arg['kind'].lower(): movie = entry break if not movie: movie = { 'title': arg['title'], 'kind': arg['kind'], 'year': arg['year'], 'dummy': None } if 'genres' not in movie.keys(): movie['genres'] = ['N/A'] if 'rating' not in movie.keys(): movie['rating'] = 'N/A' html_title_td = movie['title'] if 'dummy' in movie.keys() else \ f'{movie["title"]}' self.html_table[index] = ( f'\n{" "*8}' f'' f'' f'' f'' ) def gen(self): """ Generate an HTML list based on input, using a threaded worker """ if not self.src: self.src = Path(os.path.dirname(sys.argv[0])) / 'movie_list' else: self.src = Path(self.src) if not self.src.exists(): sys.stderr.write(f'error: input does not exist - {self.src}\n') return False self.movie_list = {} # Open the movie list & split the columns with open(self.src, 'r', encoding='utf-8') as fp_handle: mlist_raw = fp_handle.read() for raw_line in mlist_raw.splitlines(): self.movie_list.update({ len(self.movie_list): { 'title': raw_line[0:next((i for i, ch in enumerate(raw_line) if ch in {'<', '('}), None) - 1], 'kind': raw_line[raw_line.find('<')+1:raw_line.rfind('>')+1].strip('<>') or 'movie', 'year': raw_line[raw_line.find('(')+1:raw_line.find(')')], 'status': raw_line[raw_line.find('[')+1:raw_line.find(']')], } }) self.html_table = [None] * len(self.movie_list) # Progress bar. Enough said pbar = progressbar.ProgressBar(max_value=len(self.movie_list)) for idx, movie in self.movie_list.items(): match = [html_row for html_row in self.prev_html if movie['title'] in html_row and 'N/A' not in html_row] if match: # Update movies as DONE in case of change match = match[0].replace('*', movie['status']) # Directly insert the current HTML line from the older output self.html_table[idx] = \ f'\n{" "*8}{match[match.find("") + 5:]}' pbar.increment() else: thread = threading.Thread(target=self._worker, args=(movie, idx)) self.threads.append(thread) max_threads = 10 while self.threads: threads_alive = self.get_alive_threads() threads_to_be_started = [i for i in self.threads if i not in threads_alive] for idx in range(max_threads if max_threads < len(threads_to_be_started) else len(threads_to_be_started)): threads_to_be_started[idx].start() pbar.increment() time.sleep(1) time.sleep(1) self.delete_finished_threads() self.html += ''.join(self.html_table) return True def delete_finished_threads(self): for idx, thread in enumerate(self.threads): if not thread.is_alive() and thread._started.is_set(): thread.join() self.threads[idx] = None self.threads = list(filter(lambda a: a is not None, self.threads)) def get_alive_threads(self): threads = [] for thread in self.threads: if thread.is_alive() or thread._started.is_set(): threads.append(thread) return threads def write(self, dst=None): """ Write the HTML list to index.html """ out_path = dst if dst else self.dst # Just a fancy scrollbar for the html scroll = '' self.html += ('\n\t\n
Index Title Year IMDb Rating Genre Status
{index + 1}{html_title_td}{movie["year"]}{movie["rating"]}{", ".join(movie["genres"])}{arg["status"]}
{idx + 1}
\n' + '\nGenerated on: ' + time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) + ' by ' + sys.argv[0] + scroll + '\n') with open(out_path, 'wb') as fp_handle: fp_handle.write(self.html.encode('utf8')) def read_prev_output(self): """ Import a previous HTML table """ if self.dst.exists(): with open(self.dst, 'rb') as fp_handle: self.prev_html = fp_handle.read().decode('utf8').split('\n') def main(): """ Default run """ src = dst = None if len(sys.argv) > 3: sys.stderr.write(f'error: max 2 variables, {len(sys.argv)-1} given!\n') sys.exit(1) if len(sys.argv) > 1: src = sys.argv[1] if len(sys.argv) == 3: dst = sys.argv[2] mlist = MovieList(src=src, dst=dst) if mlist.gen(): mlist.write(dst=dst) if __name__ == "__main__": main()