From 9a8144af68e98cbcd3d9841d1e8d8bfd15c65b9d Mon Sep 17 00:00:00 2001 From: Amber Date: Tue, 30 Dec 2025 18:40:24 -0500 Subject: [PATCH] Added a popular tags feature, greatly cleaned up code, commented spots --- README.md | 5 +-- build_total_stats_model.py | 23 +---------- stats_model.py | 30 ++++++++++++++ tumblr_stats.py | 83 +++++++++++++++++++++++--------------- 4 files changed, 83 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index c62e3f5..558b194 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # tumblr-stats ## Usage ``` -usage: tumblr_stats.py [-h] -b BLOG [-t TAGS [TAGS ...]] OPERATION +usage: tumblr_stats.py [-h] -b BLOG [-t TAGS [TAGS ...]] OPERATION [OPERATION ...] -Use pytumblr to calculate stats after setting these enviroment variables: $TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, -$TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET +Use pytumblr to calculate stats after setting these enviroment variables: $TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET positional arguments: OPERATION operation used to calculate stats diff --git a/build_total_stats_model.py b/build_total_stats_model.py index 3d57f18..a22e7ce 100644 --- a/build_total_stats_model.py +++ b/build_total_stats_model.py @@ -16,26 +16,5 @@ class BuildTotalStatsModel(StatsModel): # Posts ranked from most popular to least popular by notes within each month and year. top_post_urls_by_month_and_year: Dict[str, List[str]] = field(init=False) - # Tags ranked from most popular to least popular by notes. - most_popular_tags: List[Dict[str, Any]] = field(default_factory=list) - def __post_init__(self): - super().__post_init__() - self.most_popular_tags = self.determine_most_popular_tags() - - def determine_most_popular_tags(self) -> List[Dict[str, Any]]: - tag_dict: Dict[str, Any] = {} - for post_key in self.original_post_map: - post = self.original_post_map[post_key] - tags = post['tags'] - for tag in tags: - if tag in tag_dict: - tag_dict[tag] = { - 'tag': tag, 'note_count': tag_dict[tag] + post['note_count']} - else: - tag_dict[tag] = {'tag': tag, - 'note_count': post['note_count']} - - tag_list = sorted(list(tag_dict.values()), - key=itemgetter('note_count'), reverse=True) - return tag_list + super().__post_init__() \ No newline at end of file diff --git a/stats_model.py b/stats_model.py index 11643f7..db09861 100644 --- a/stats_model.py +++ b/stats_model.py @@ -1,5 +1,7 @@ +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime +from operator import itemgetter from typing import Any, Dict, List @@ -34,11 +36,15 @@ class StatsModel: total_original_post_notes_by_month_and_year: Dict[str, int] = field( init=False) + # Tags ranked from most popular to least popular by notes. + most_popular_tags: List[Dict[str, Any]] = field(init=False) + def __post_init__(self): self.total_posts = self.calculate_total_posts() self.total_original_posts = self.calculate_total_original_posts() self.total_original_post_notes = self.calculate_total_original_post_notes() self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year() + self.most_popular_tags = self.determine_most_popular_tags() def calculate_total_posts(self) -> int: return len(self.original_post_map) + len(self.unoriginal_post_map) @@ -65,3 +71,27 @@ class StatsModel: else: date_map[post_date_key] = post['note_count'] return date_map + + def determine_most_popular_tags(self) -> List[Dict[str, Any]]: + tag_dict: Dict[str, Any] = {} + tag_dict = defaultdict(lambda : {'note_count': 0, + 'post_count': 0}, + tag_dict) + for post_key in self.original_post_map: + post = self.original_post_map[post_key] + tags = post['tags'] + for tag in tags: + sts = tag_dict[tag] + sts['tag'] = tag + sts['post_count'] += 1 + sts['note_count'] += post['note_count'] + + for tag in tag_dict: + sts = tag_dict[tag] + post_count = sts['post_count'] + note_count = sts['note_count'] + sts['notes_to_posts_ratio'] = note_count / post_count + + tag_list = sorted(list(tag_dict.values()), key=itemgetter('note_count'), + reverse=True) + return tag_list diff --git a/tumblr_stats.py b/tumblr_stats.py index 31c3b70..41dbf6f 100644 --- a/tumblr_stats.py +++ b/tumblr_stats.py @@ -2,7 +2,6 @@ import argparse import csv from dataclasses import asdict -from datetime import datetime import json import os import sys @@ -22,14 +21,17 @@ def get_args() -> Dict[str, Any]: description='Use pytumblr to calculate stats after setting these enviroment variables: ' + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') - parser.add_argument('operation', type=str, metavar='OPERATION', choices=['build_tag_stats'], + parser.add_argument('operation', type=str, nargs = '+', + metavar='OPERATION', choices=['build_tag_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, help='blog name for which to calculate stats') parser.add_argument('-t', '--tags', type=str, nargs='+', help='tag(s) to focus on in status (if applicable)') + # TODO: Make 'before' work, but it actually depends on https://github.com/tumblr/pytumblr/issues/174. # parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # help='only gather posts before YYYY-MM-DD') + # TODO: Make 'after' work if they add it to pytumblr. # parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # help='only gather posts after YYYY-MM-DD') return vars(parser.parse_args()) @@ -64,12 +66,15 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st dumpster: Dict[str, Any] = {} blog_name = args['blog'] + # We populate params, starting with any tags for filtering. params = {} if args['tags']: params.update({'tag': ','.join(args['tags'])}) + # TODO: Make 'before' work. # if args['before']: # before: datetime = args['before'] # params.update({'before': int(before.timestamp())}) + # TODO: Make 'after' work. # if args['after']: # after: datetime = args['after'] # params.update({'after': str(int(after.timestamp()))}) @@ -78,20 +83,21 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st offset = 0 limit = 20 + # The request loop that pulls all data from the APIs. while offset <= total: # Begin LOOP - # Get me some posts! 😈🍪🍪🍪 + # Get me some posts via REST! 😈🍪🍪🍪 data = client.posts(f"{blog_name}.tumblr.com", offset=offset, limit=limit, **params) - - # Sh**t it in the head if we found no posts. + + # Stop the presses if we found no posts. if not data['posts']: print('Stopping, as no posts were found.') break - # Total check for the first good iteration, but always checked for sanity. + # Total init check for the first iteration, but always checked for sanity. if total == 0: # Let's see what's in there, total_posts = data['total_posts'] @@ -100,6 +106,7 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st print(f"I'm working with {total_posts} total posts...") total = total_posts + # This block populates the local post_map from the raw response data. curr_posts = data['posts'] local_post_map: Dict[str, Any] = {} for curr_post in curr_posts: @@ -107,47 +114,50 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st if curr_key not in local_post_map: local_post_map[curr_key] = curr_post + # This block populates the local dumpster from the raw response data. local_dumpster = {} filtered_local_post_map = {} for local_key in local_post_map: local_post = local_post_map[local_key] + # Determines whether this is an OG post. if 'parent_post_url' not in local_post: filtered_local_post_map[local_key] = local_post - else: + else: # If it's not an OG post, into the local dumpster. local_dumpster[local_key] = local_post - # The sacred should we add, and if we should, DO ADD, if statement. - has_og_posts = any(post not in post_map for post in filtered_local_post_map) + # The sacred "should we add, and if we should, DO ADD" conditional statements. + has_og_posts = any( + post not in post_map for post in filtered_local_post_map) has_not_og_posts = any(post not in dumpster for post in local_dumpster) if has_og_posts: post_map.update(filtered_local_post_map) if has_not_og_posts: dumpster.update(local_dumpster) - + # The increment and status printing. Should always end the loop! offset += limit - if offset == limit: - print('Processed first batch...') - elif offset < total: - print(f"Processed batch {offset // limit} of {total // 20}...") - else: - print(f"Processed all {total} posts") + print(f"Processed batch {offset // limit} of {(total // 20) + 1}...") # End LOOP + # Return (og_posts, not_og_posts). return (post_map, dumpster) -def build_tag_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTagStatsModel: - post_map, dumpster = build_post_map_and_dumpster(client, args) - stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], original_post_map=post_map, +def build_tag_stats_model(post_map: Dict[str, Any], + dumpster: Dict[str, Any], + args: Dict[str, Any]) -> BuildTagStatsModel: + stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], + original_post_map=post_map, unoriginal_post_map=dumpster) stats_model.tags = args['tags'] return stats_model -def build_total_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTotalStatsModel: - post_map, dumpster = build_post_map_and_dumpster(client, args) - stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], original_post_map=post_map, +def build_total_stats_model(post_map: Dict[str, Any], + dumpster: Dict[str, Any], + args: Dict[str, Any]) -> BuildTotalStatsModel: + stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], + original_post_map=post_map, unoriginal_post_map=dumpster) return stats_model @@ -156,32 +166,39 @@ def main() -> None: args = get_args() client = init_client() - stats_model = StatsModel(blog_name=args['blog'], operation='undefined', - original_post_map={}, unoriginal_post_map={}) + # Get the post_map (original posts) and dumpster (not original posts). + post_map, dumpster = build_post_map_and_dumpster(args=args, client=client) - if args['operation'] == 'build_tag_stats': - stats_model = build_tag_stats_model(client, args) - elif args['operation'] == 'build_total_stats': + # Pick a stats model, which will determine output. + stats_model: StatsModel + if 'build_tag_stats' in args['operation']: + stats_model = build_tag_stats_model(post_map, dumpster, args) + if 'build_total_stats' in args['operation']: if 'before' not in args: # or 'after' not in args: print('You must specify a time range for build_total stats. ' + 'You\'ll otherwise request TOO MUCH DATA!') sys.exit() - stats_model = build_total_stats_model(client, args) + stats_model = build_total_stats_model(post_map, dumpster, args) + + # Write the chosen model as JSON output. with open('./tumblr_stats.json', 'w') as f: - json.dump(asdict(stats_model), f, indent=2, sort_keys=True) - if stats_model.original_post_map: + json.dump(asdict(stats_model), f, indent=1) + + # If there were original posts, create a CSV for them. + if post_map: with open('./tumblr_original_posts.csv', 'w', newline='') as f: - post_list: List[Dict[str, Any]] = list( - stats_model.original_post_map.values()) + post_list: List[Dict[str, Any]] = list(post_map.values()) wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore', fieldnames=post_list[0].keys()) wr.writeheader() wr.writerows(post_list) else: - print('No original posts were found, so no CSV of original posts was written.') + print('No original posts were found, so a CSV of original posts was not written.') return +# DO NOT DELETE. The main if statement. if __name__ == '__main__': main() + print('All done.') sys.exit(0)