From a2d49785bea46d6e0a15522ea64547b001c31e7e Mon Sep 17 00:00:00 2001 From: Amber Date: Sat, 4 Apr 2026 02:02:24 -0400 Subject: [PATCH] build_draft_stats, total_original_post_notes_by_qtr_and_year, other minor improvements --- .gitignore | 4 ++ build_draft_stats_model.py | 12 ++++++ stats_model.py | 36 +++++++++++++++++- tumblr_stats.py | 78 +++++++++++++++++++++++++------------- 4 files changed, 102 insertions(+), 28 deletions(-) create mode 100644 build_draft_stats_model.py diff --git a/.gitignore b/.gitignore index bbc6b87..d0a71bb 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ __pycache__/ .pytest_cache/ .ruff_cache/ .pdm-python + +# Data files +*.json +*.csv \ No newline at end of file diff --git a/build_draft_stats_model.py b/build_draft_stats_model.py new file mode 100644 index 0000000..2396de1 --- /dev/null +++ b/build_draft_stats_model.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass +from stats_model import StatsModel + + +@dataclass(kw_only=True) +class BuildDraftStatsModel(StatsModel): + """Stats model built around calculating stats from your currently drafted posts""" + operation: str = 'build_draft_stats' + + def __post_init__(self): + super().__post_init__() + self.most_popular_tags = self.determine_most_popular_tags('post_count') diff --git a/stats_model.py b/stats_model.py index f722a6d..4813e8d 100644 --- a/stats_model.py +++ b/stats_model.py @@ -33,7 +33,11 @@ class StatsModel: total_original_post_notes: int = field(init=False) # Total notes for original posts within each month and year. - total_original_post_notes_by_month_and_year: Dict[str, int] = field( + total_original_post_notes_by_month_and_year: Dict[str, Any] = field( + init=False) + + # Total notes for original posts within each quarter and year. + total_original_post_notes_by_qtr_and_year: Dict[str, Any] = field( init=False) # Tags ranked from most popular to least popular by notes. @@ -44,6 +48,7 @@ class StatsModel: self.total_original_posts = self.calculate_total_original_posts() self.total_original_post_notes = self.calculate_total_original_post_notes() self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year() + self.total_original_post_notes_by_qtr_and_year = self.calculate_total_original_post_notes_by_qtr_and_year() self.most_popular_tags = self.determine_most_popular_tags('note_count') def calculate_total_posts(self) -> int: @@ -58,7 +63,7 @@ class StatsModel: total += self.original_post_map[post_key]['note_count'] return total - def calculate_total_original_post_notes_by_month_and_year(self) -> Dict[str, int]: + def calculate_total_original_post_notes_by_month_and_year(self) -> Dict[str, Any]: # https://docs.python.org/3/library/collections.html#defaultdict-objects date_map: Dict[str, Any] = {} date_map = defaultdict(lambda: {'note_count': 0, @@ -113,3 +118,30 @@ class StatsModel: # https://stackoverflow.com/a/73050 return sorted(list(tag_dict.values()), key=itemgetter(sort_key), reverse=True) + + def calculate_total_original_post_notes_by_qtr_and_year(self) -> Dict[str, Any]: + total_original_post_notes_by_month_and_year: Dict[str, int] = self.total_original_post_notes_by_month_and_year + if not total_original_post_notes_by_month_and_year: + total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year() + self.total_original_post_notes_by_month_and_year = total_original_post_notes_by_month_and_year.copy() + + quarter_map: Dict[str, Any] = {} + quarter_map = defaultdict(lambda: {'note_count': 0, + 'post_count': 0}, + quarter_map) + for key in total_original_post_notes_by_month_and_year: + month_ent = total_original_post_notes_by_month_and_year[key] + month_year: datetime = datetime.strptime(key, '%Y-%m') + qtr = (month_year.month + 2) // 3 + new_key = f"{month_year.year}q{qtr}" + quarter_map[new_key]['note_count'] += month_ent['note_count'] + quarter_map[new_key]['post_count'] += month_ent['post_count'] + + # Results postprocessing. + for quarter in quarter_map: + sts = quarter_map[quarter] + post_count = sts['post_count'] + note_count = sts['note_count'] + sts['notes_to_posts_ratio'] = note_count / post_count + + return quarter_map \ No newline at end of file diff --git a/tumblr_stats.py b/tumblr_stats.py index a2af8bb..14a1a03 100644 --- a/tumblr_stats.py +++ b/tumblr_stats.py @@ -11,6 +11,7 @@ from typing import Any, Callable, Dict, List, Tuple import pytumblr +from build_draft_stats_model import BuildDraftStatsModel from build_tag_stats_model import BuildTagStatsModel from build_total_stats_model import BuildTotalStatsModel from build_queue_stats_model import BuildQueueStatsModel @@ -25,7 +26,7 @@ def get_args() -> Dict[str, Any]: + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION', - choices=['build_tag_stats', 'build_queue_stats'], + choices=['build_tag_stats', 'build_queue_stats', 'build_draft_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, help='blog name for which to calculate stats') @@ -76,7 +77,6 @@ def filter_posts_for_after(post_list: List[Dict[str, Any]], x['date'], '%Y-%m-%d %H:%M:%S %Z') > after return [post for post in post_list if after_check(post)] - def build_post_maps(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: og_post_map: Dict[str, Any] = {} @@ -92,6 +92,9 @@ def build_post_maps(client: pytumblr.TumblrRestClient, # before: datetime = args['before'] # params.update({'before': int(before.timestamp())}) + draft_url = f"/v2/blog/{blog_name}/posts/draft" + is_draft_stats: bool = 'build_draft_stats' in args['operation'] + total: int = 0 offset: int = 0 limit: int = 20 @@ -101,12 +104,14 @@ def build_post_maps(client: pytumblr.TumblrRestClient, # Begin LOOP # Get me some posts via REST! 😈🍪🍪🍪 data: Dict[str, Any] - if 'build_queue_stats' in args['operation'] and len(args['operation']) == 1: + if 'build_queue_stats' in args['operation']: data = client.queue(f"{blog_name}.tumblr.com", offset=offset, limit=limit, **params) - else: # Above is for queued posts, below is for published posts. + elif is_draft_stats: + data = client.send_api_request("get", draft_url) + else: # Above is for queued + draft posts, below is for published posts. data = client.posts(f"{blog_name}.tumblr.com", offset=offset, limit=limit, @@ -119,7 +124,7 @@ def build_post_maps(client: pytumblr.TumblrRestClient, break next_off: int = 0 - if '_links' in data: + if '_links' in data and not is_draft_stats: links = data['_links'] if 'next' in links and 'query_params' in links['next']: next_off = int(links['next']['query_params']['offset']) @@ -135,7 +140,7 @@ def build_post_maps(client: pytumblr.TumblrRestClient, after: datetime = args['after'] curr_posts = filter_posts_for_after(curr_posts, after) if not curr_posts: - print(f"All posts after {after.year}-{after.month} processed.") + print(f"All posts after {after.year}-{after.month}-{after.day} processed.") return (og_post_map, un_og_post_map) # This block populates the local post_maps from the raw response data. @@ -150,6 +155,15 @@ def build_post_maps(client: pytumblr.TumblrRestClient, og_post_map.update(local_og_post_map) un_og_post_map.update(local_un_og_post_map) + # For build_draft_stats. + if is_draft_stats: + if '_links' in data: + draft_url = data['_links']['next']['href'] + continue + else: + print('All draft posts processed.') + break + # The increment and status printing. if next_off != 0 and next_off != offset: offset = next_off @@ -167,6 +181,13 @@ def build_post_maps(client: pytumblr.TumblrRestClient, def main() -> None: args: Dict[str, Any] = get_args() client: pytumblr.TumblrRestClient = init_client() + operation: List[str] = args['operation'] + + # Quick bail for bad use of build_queue_stats and build_draft_stats. + if (set(operation) & set(['build_queue_stats', 'build_draft_stats'])) and len(operation) > 1: + print(f"You can't mix operations {operation} together. Sorry.") + sys.exit(1) + pass # Handle JSON input (if you don't want to make API calls.) if 'input' in args and args['input']: @@ -193,28 +214,33 @@ def main() -> None: # Pick a stats model, which will determine output. stats_model: StatsModel - if 'build_queue_stats' in args['operation']: - if len(args['operation']) != 1: - print('You can\'t mix build_queue_stats with other operations. Sorry.') + match args: + case {'operation': ['build_queue_stats']}: + stats_model = BuildQueueStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) + case {'operation': ['build_draft_stats']}: + stats_model = BuildDraftStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) + case {'operation': op} if 'build_tag_stats' in operation: + stats_model = BuildTagStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) + stats_model.tags = args['tags'] + case {'operation': op} if 'build_total_stats' in operation: + if 'before' not in args: # or 'after' not in args: + print(f"You must specify a time range for {op}. " + + 'You\'ll otherwise request TOO MUCH DATA!') + sys.exit(1) + stats_model = BuildTotalStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) + case _: + print('Unsupported command. How did you even make it this far?!') sys.exit(1) - stats_model = BuildQueueStatsModel(blog_name=args['blog'], - original_post_map=og_post_map, - unoriginal_post_map=un_og_post_map) - if 'build_tag_stats' in args['operation']: - stats_model = BuildTagStatsModel(blog_name=args['blog'], - original_post_map=og_post_map, - unoriginal_post_map=un_og_post_map) - stats_model.tags = args['tags'] - if 'build_total_stats' in args['operation']: - if 'before' not in args: # or 'after' not in args: - print('You must specify a time range for build_total stats. ' + - 'You\'ll otherwise request TOO MUCH DATA!') - sys.exit(1) - stats_model = BuildTotalStatsModel(blog_name=args['blog'], - original_post_map=og_post_map, - unoriginal_post_map=un_og_post_map) - # Write the chosen model as JSON output. + # Write the selected model as JSON output. with open('./tumblr_stats.json', 'w') as f: json.dump(asdict(stats_model), f, indent=1, default=str)