diff --git a/build_tag_stats_model.py b/build_tag_stats_model.py index dc547f2..6f00cbb 100644 --- a/build_tag_stats_model.py +++ b/build_tag_stats_model.py @@ -22,14 +22,16 @@ class BuildTagStatsModel(StatsModel): for post_key in self.original_post_map: post = self.original_post_map[post_key] post_list.append({ - 'id_string': post['id_string'], 'post_url': post['post_url'], 'tags': post['tags'], 'note_count': post['note_count'] }) + # https://stackoverflow.com/a/73050 sorted_list = sorted(post_list, key=itemgetter('note_count'), reverse=True) + + # https://stackoverflow.com/a/522578 for i, post in enumerate(sorted_list): post['rank'] = i + 1 diff --git a/stats_model.py b/stats_model.py index d1a8603..64dfdea 100644 --- a/stats_model.py +++ b/stats_model.py @@ -59,13 +59,16 @@ class StatsModel: return total def calculate_total_original_post_notes_by_month_and_year(self) -> Dict[str, int]: + # https://docs.python.org/3/library/collections.html#defaultdict-objects date_map: Dict[str, Any] = {} date_map = defaultdict(lambda: {'note_count': 0, 'post_count': 0}, date_map) + + # Gathering the results. for post_key in self.original_post_map: post = self.original_post_map[post_key] - # Format is like 2025-12-28 20:00:34 GMT + # Format is like '2025-12-28 20:00:34 GMT' post_date: datetime = datetime.strptime( post['date'], '%Y-%m-%d %H:%M:%S %Z') post_date_key = f"{post_date.year}-{post_date.month:02}" @@ -74,6 +77,7 @@ class StatsModel: sts['post_count'] += 1 sts['note_count'] += post['note_count'] + # Results postprocessing. for date in date_map: sts = date_map[date] post_count = sts['post_count'] @@ -83,10 +87,13 @@ class StatsModel: return date_map def determine_most_popular_tags(self) -> List[Dict[str, Any]]: + # https://docs.python.org/3/library/collections.html#defaultdict-objects tag_dict: Dict[str, Any] = {} tag_dict = defaultdict(lambda: {'note_count': 0, 'post_count': 0}, tag_dict) + + # Gathering the results. for post_key in self.original_post_map: post = self.original_post_map[post_key] tags = post['tags'] @@ -96,12 +103,13 @@ class StatsModel: sts['post_count'] += 1 sts['note_count'] += post['note_count'] + # Results postprocessing. for tag in tag_dict: sts = tag_dict[tag] post_count = sts['post_count'] note_count = sts['note_count'] sts['notes_to_posts_ratio'] = note_count / post_count - tag_list = sorted(list(tag_dict.values()), key=itemgetter('note_count'), - reverse=True) - return tag_list + # https://stackoverflow.com/a/73050 + return sorted(list(tag_dict.values()), key=itemgetter('note_count'), + reverse=True) diff --git a/tumblr_stats.py b/tumblr_stats.py index deed34e..6c5986d 100644 --- a/tumblr_stats.py +++ b/tumblr_stats.py @@ -21,7 +21,7 @@ def get_args() -> Dict[str, Any]: description='Use pytumblr to calculate stats after setting these enviroment variables: ' + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') - parser.add_argument('operation', type=str, nargs = '+', + parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION', choices=['build_tag_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, @@ -43,11 +43,11 @@ def init_client() -> pytumblr.TumblrRestClient: oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN') oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET') - missing_vars = [name for name, - val in [('$TUMBLR_CONSUMER_KEY', consumer_key), - ('$TUMBLR_CONSUMER_SECRET', consumer_secret), - ('$TUMBLR_OAUTH_TOKEN', oauth_token), - ('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None] + missing_vars: List[str] = [name for name, + val in [('$TUMBLR_CONSUMER_KEY', consumer_key), + ('$TUMBLR_CONSUMER_SECRET', consumer_secret), + ('$TUMBLR_OAUTH_TOKEN', oauth_token), + ('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None] if missing_vars: print("Missing important environment variables:", missing_vars) @@ -61,10 +61,11 @@ def init_client() -> pytumblr.TumblrRestClient: ) -def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: - post_map: Dict[str, Any] = {} - dumpster: Dict[str, Any] = {} - blog_name = args['blog'] +def build_post_maps(client: pytumblr.TumblrRestClient, + args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: + og_post_map: Dict[str, Any] = {} + un_og_post_map: Dict[str, Any] = {} + blog_name: str = args['blog'] # We populate params, starting with any tags for filtering. params = {} @@ -79,9 +80,9 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st # after: datetime = args['after'] # params.update({'after': str(int(after.timestamp()))}) - total = 0 - offset = 0 - limit = 20 + total: int = 0 + offset: int = 0 + limit: int = 20 # The request loop that pulls all data from the APIs. while offset <= total: @@ -98,41 +99,24 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st break # Total init check for the first iteration, but always checked for sanity. - if total == 0: + if not total: # Let's see what's in there, total_posts = data['total_posts'] - - # Something was there, so we're good. print(f"I'm working with {total_posts} total posts...") total = total_posts - # This block populates the local post_map from the raw response data. - curr_posts = data['posts'] - local_post_map: Dict[str, Any] = {} - for curr_post in curr_posts: - curr_key = curr_post['id_string'] - if curr_key not in local_post_map: - local_post_map[curr_key] = curr_post + # This block populates the local post_maps from the raw response data. + curr_posts: List[Dict[str, Any]] = data['posts'] + local_og_post_map: Dict[str, Any] = { + item['id_string']: item for item in curr_posts if 'parent_post_url' not in item + } + local_un_og_post_map: Dict[str, Any] = { + item['id_string']: item for item in curr_posts if 'parent_post_url' in item + } - # This block populates the local dumpster from the raw response data. - local_dumpster = {} - filtered_local_post_map = {} - for local_key in local_post_map: - local_post = local_post_map[local_key] - # Determines whether this is an OG post. - if 'parent_post_url' not in local_post: - filtered_local_post_map[local_key] = local_post - else: # If it's not an OG post, into the local dumpster. - local_dumpster[local_key] = local_post - - # The sacred "should we add, and if we should, DO ADD" conditional statements. - has_og_posts = any( - post not in post_map for post in filtered_local_post_map) - has_not_og_posts = any(post not in dumpster for post in local_dumpster) - if has_og_posts: - post_map.update(filtered_local_post_map) - if has_not_og_posts: - dumpster.update(local_dumpster) + # Update the maps with what we found. + og_post_map.update(local_og_post_map) + un_og_post_map.update(local_un_og_post_map) # The increment and status printing. Should always end the loop! offset += limit @@ -140,54 +124,40 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st # End LOOP # Return (og_posts, not_og_posts). - return (post_map, dumpster) - - -def build_tag_stats_model(post_map: Dict[str, Any], - dumpster: Dict[str, Any], - args: Dict[str, Any]) -> BuildTagStatsModel: - stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], - original_post_map=post_map, - unoriginal_post_map=dumpster) - stats_model.tags = args['tags'] - return stats_model - - -def build_total_stats_model(post_map: Dict[str, Any], - dumpster: Dict[str, Any], - args: Dict[str, Any]) -> BuildTotalStatsModel: - stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], - original_post_map=post_map, - unoriginal_post_map=dumpster) - return stats_model + return (og_post_map, un_og_post_map) def main() -> None: args = get_args() client = init_client() - # Get the post_map (original posts) and dumpster (not original posts). - post_map, dumpster = build_post_map_and_dumpster(args=args, client=client) + # Get the og_post_map (original posts) and un_og_post_map (not original posts). + og_post_map, un_og_post_map = build_post_maps(args=args, client=client) # Pick a stats model, which will determine output. stats_model: StatsModel if 'build_tag_stats' in args['operation']: - stats_model = build_tag_stats_model(post_map, dumpster, args) + stats_model = BuildTagStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) + stats_model.tags = args['tags'] if 'build_total_stats' in args['operation']: if 'before' not in args: # or 'after' not in args: print('You must specify a time range for build_total stats. ' + 'You\'ll otherwise request TOO MUCH DATA!') - sys.exit() - stats_model = build_total_stats_model(post_map, dumpster, args) + sys.exit(1) + stats_model = BuildTotalStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) # Write the chosen model as JSON output. with open('./tumblr_stats.json', 'w') as f: json.dump(asdict(stats_model), f, indent=1, default=str) # If there were original posts, create a CSV for them. - if post_map: + if og_post_map: with open('./tumblr_original_posts.csv', 'w', newline='') as f: - post_list: List[Dict[str, Any]] = list(post_map.values()) + post_list: List[Dict[str, Any]] = list(og_post_map.values()) wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore', fieldnames=post_list[0].keys()) wr.writeheader()