diff --git a/build_queue_stats_model.py b/build_queue_stats_model.py new file mode 100644 index 0000000..9f2c6fa --- /dev/null +++ b/build_queue_stats_model.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass, field +from datetime import datetime +from operator import itemgetter +from typing import Any, Dict, List + +from stats_model import StatsModel + + +@dataclass(kw_only=True) +class BuildQueueStatsModel(StatsModel): + """Stats model built around calculating stats from your currently queued posts""" + operation: str = 'build_queue_stats' + + # Queued posts (both original and not original), sorted in publish order. + ordered_queue: List[Dict[str, Any]] = field(init=False) + + def __post_init__(self): + super().__post_init__() + self.most_popular_tags = self.determine_most_popular_tags('post_count') + self.ordered_queue = self.determine_ordered_queue() + + def determine_ordered_queue(self) -> List[Dict[str, Any]]: + full_post_map = self.original_post_map | self.unoriginal_post_map + + post_list: List[Dict[str, Any]] = [] + for post_key in full_post_map: + post = full_post_map[post_key] + + if 'scheduled_publish_time' not in post or not post['scheduled_publish_time']: + print('WARNING: Queued post found without publish time. Huh?') + + queued_date_time: datetime = datetime.fromtimestamp( + post['scheduled_publish_time']) + post_list.append({ + 'post_url': post['post_url'], + 'tags': post['tags'], + 'publish_date_time': queued_date_time + }) + + # https://stackoverflow.com/a/73050 + sorted_list = sorted(post_list, key=itemgetter('publish_date_time')) + + # https://stackoverflow.com/a/522578 + for i, post in enumerate(sorted_list): + post['queue_order'] = i + 1 + + return sorted_list diff --git a/stats_model.py b/stats_model.py index 64dfdea..f722a6d 100644 --- a/stats_model.py +++ b/stats_model.py @@ -44,7 +44,7 @@ class StatsModel: self.total_original_posts = self.calculate_total_original_posts() self.total_original_post_notes = self.calculate_total_original_post_notes() self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year() - self.most_popular_tags = self.determine_most_popular_tags() + self.most_popular_tags = self.determine_most_popular_tags('note_count') def calculate_total_posts(self) -> int: return len(self.original_post_map) + len(self.unoriginal_post_map) @@ -86,7 +86,7 @@ class StatsModel: return date_map - def determine_most_popular_tags(self) -> List[Dict[str, Any]]: + def determine_most_popular_tags(self, sort_key: str) -> List[Dict[str, Any]]: # https://docs.python.org/3/library/collections.html#defaultdict-objects tag_dict: Dict[str, Any] = {} tag_dict = defaultdict(lambda: {'note_count': 0, @@ -111,5 +111,5 @@ class StatsModel: sts['notes_to_posts_ratio'] = note_count / post_count # https://stackoverflow.com/a/73050 - return sorted(list(tag_dict.values()), key=itemgetter('note_count'), + return sorted(list(tag_dict.values()), key=itemgetter(sort_key), reverse=True) diff --git a/tumblr_stats.py b/tumblr_stats.py index c139d7e..a2af8bb 100644 --- a/tumblr_stats.py +++ b/tumblr_stats.py @@ -13,6 +13,7 @@ import pytumblr from build_tag_stats_model import BuildTagStatsModel from build_total_stats_model import BuildTotalStatsModel +from build_queue_stats_model import BuildQueueStatsModel from stats_model import StatsModel @@ -23,8 +24,8 @@ def get_args() -> Dict[str, Any]: description='Use pytumblr to calculate stats after setting these enviroment variables: ' + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') - parser.add_argument('operation', type=str, nargs='+', - metavar='OPERATION', choices=['build_tag_stats'], + parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION', + choices=['build_tag_stats', 'build_queue_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, help='blog name for which to calculate stats') @@ -96,23 +97,35 @@ def build_post_maps(client: pytumblr.TumblrRestClient, limit: int = 20 # The request loop that pulls all data from the APIs. - while offset <= total: + while True: # Begin LOOP # Get me some posts via REST! 😈🍪🍪🍪 - data = client.posts(f"{blog_name}.tumblr.com", - offset=offset, - limit=limit, - **params) + data: Dict[str, Any] + if 'build_queue_stats' in args['operation'] and len(args['operation']) == 1: + data = client.queue(f"{blog_name}.tumblr.com", + offset=offset, + limit=limit, + **params) + else: # Above is for queued posts, below is for published posts. + data = client.posts(f"{blog_name}.tumblr.com", + offset=offset, + limit=limit, + **params) # Stop the presses if we found no posts. curr_posts: List[Dict[str, Any]] = data['posts'] if not curr_posts or len(curr_posts) < 1: - print('Stopping, as no posts were found.') + print('Stopping, as no more posts were found.') break + next_off: int = 0 + if '_links' in data: + links = data['_links'] + if 'next' in links and 'query_params' in links['next']: + next_off = int(links['next']['query_params']['offset']) + # Total init check for the first iteration, but always checked for sanity. - if not total: - # Let's see what's in there, + if not total and 'total_posts' in data: total_posts = data['total_posts'] print(f"I'm working with {total_posts} total posts...") total = total_posts @@ -137,9 +150,12 @@ def build_post_maps(client: pytumblr.TumblrRestClient, og_post_map.update(local_og_post_map) un_og_post_map.update(local_un_og_post_map) - # The increment and status printing. Should always end the loop! - offset += limit - if not args['after']: + # The increment and status printing. + if next_off != 0 and next_off != offset: + offset = next_off + else: + offset += limit + if not args['after'] and total: print( f"Processed batch {offset // limit} of {(total // 20) + 1}...") # End LOOP @@ -177,6 +193,13 @@ def main() -> None: # Pick a stats model, which will determine output. stats_model: StatsModel + if 'build_queue_stats' in args['operation']: + if len(args['operation']) != 1: + print('You can\'t mix build_queue_stats with other operations. Sorry.') + sys.exit(1) + stats_model = BuildQueueStatsModel(blog_name=args['blog'], + original_post_map=og_post_map, + unoriginal_post_map=un_og_post_map) if 'build_tag_stats' in args['operation']: stats_model = BuildTagStatsModel(blog_name=args['blog'], original_post_map=og_post_map,