import argparse import csv from dataclasses import asdict import json import os import sys from typing import Any, Dict, List, Tuple import pytumblr from build_tag_stats_model import BuildTagStatsModel from build_total_stats_model import BuildTotalStatsModel from stats_model import StatsModel def get_args() -> Dict[str, Any]: """Pull arguments from command line, turn them into a dictionary of """ parser = argparse.ArgumentParser( prog='tumblr_stats.py', description='Use pytumblr to calculate stats after setting these enviroment variables: ' + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') parser.add_argument('operation', type=str, nargs = '+', metavar='OPERATION', choices=['build_tag_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, help='blog name for which to calculate stats') parser.add_argument('-t', '--tags', type=str, nargs='+', help='tag(s) to focus on in status (if applicable)') # TODO: Make 'before' work, but it actually depends on https://github.com/tumblr/pytumblr/issues/174. # parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # help='only gather posts before YYYY-MM-DD') # TODO: Make 'after' work if they add it to pytumblr. # parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # help='only gather posts after YYYY-MM-DD') return vars(parser.parse_args()) def init_client() -> pytumblr.TumblrRestClient: consumer_key = os.getenv('TUMBLR_CONSUMER_KEY') consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET') oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN') oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET') missing_vars = [name for name, val in [('$TUMBLR_CONSUMER_KEY', consumer_key), ('$TUMBLR_CONSUMER_SECRET', consumer_secret), ('$TUMBLR_OAUTH_TOKEN', oauth_token), ('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None] if missing_vars: print("Missing important environment variables:", missing_vars) sys.exit(1) return pytumblr.TumblrRestClient( consumer_key=consumer_key, # type: ignore consumer_secret=consumer_secret, # type: ignore oauth_token=oauth_token, # type: ignore oauth_secret=oauth_secret, # type: ignore ) def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: post_map: Dict[str, Any] = {} dumpster: Dict[str, Any] = {} blog_name = args['blog'] # We populate params, starting with any tags for filtering. params = {} if args['tags']: params.update({'tag': ','.join(args['tags'])}) # TODO: Make 'before' work. # if args['before']: # before: datetime = args['before'] # params.update({'before': int(before.timestamp())}) # TODO: Make 'after' work. # if args['after']: # after: datetime = args['after'] # params.update({'after': str(int(after.timestamp()))}) total = 0 offset = 0 limit = 20 # The request loop that pulls all data from the APIs. while offset <= total: # Begin LOOP # Get me some posts via REST! 😈🍪🍪🍪 data = client.posts(f"{blog_name}.tumblr.com", offset=offset, limit=limit, **params) # Stop the presses if we found no posts. if not data['posts']: print('Stopping, as no posts were found.') break # Total init check for the first iteration, but always checked for sanity. if total == 0: # Let's see what's in there, total_posts = data['total_posts'] # Something was there, so we're good. print(f"I'm working with {total_posts} total posts...") total = total_posts # This block populates the local post_map from the raw response data. curr_posts = data['posts'] local_post_map: Dict[str, Any] = {} for curr_post in curr_posts: curr_key = curr_post['id_string'] if curr_key not in local_post_map: local_post_map[curr_key] = curr_post # This block populates the local dumpster from the raw response data. local_dumpster = {} filtered_local_post_map = {} for local_key in local_post_map: local_post = local_post_map[local_key] # Determines whether this is an OG post. if 'parent_post_url' not in local_post: filtered_local_post_map[local_key] = local_post else: # If it's not an OG post, into the local dumpster. local_dumpster[local_key] = local_post # The sacred "should we add, and if we should, DO ADD" conditional statements. has_og_posts = any( post not in post_map for post in filtered_local_post_map) has_not_og_posts = any(post not in dumpster for post in local_dumpster) if has_og_posts: post_map.update(filtered_local_post_map) if has_not_og_posts: dumpster.update(local_dumpster) # The increment and status printing. Should always end the loop! offset += limit print(f"Processed batch {offset // limit} of {(total // 20) + 1}...") # End LOOP # Return (og_posts, not_og_posts). return (post_map, dumpster) def build_tag_stats_model(post_map: Dict[str, Any], dumpster: Dict[str, Any], args: Dict[str, Any]) -> BuildTagStatsModel: stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], original_post_map=post_map, unoriginal_post_map=dumpster) stats_model.tags = args['tags'] return stats_model def build_total_stats_model(post_map: Dict[str, Any], dumpster: Dict[str, Any], args: Dict[str, Any]) -> BuildTotalStatsModel: stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], original_post_map=post_map, unoriginal_post_map=dumpster) return stats_model def main() -> None: args = get_args() client = init_client() # Get the post_map (original posts) and dumpster (not original posts). post_map, dumpster = build_post_map_and_dumpster(args=args, client=client) # Pick a stats model, which will determine output. stats_model: StatsModel if 'build_tag_stats' in args['operation']: stats_model = build_tag_stats_model(post_map, dumpster, args) if 'build_total_stats' in args['operation']: if 'before' not in args: # or 'after' not in args: print('You must specify a time range for build_total stats. ' + 'You\'ll otherwise request TOO MUCH DATA!') sys.exit() stats_model = build_total_stats_model(post_map, dumpster, args) # Write the chosen model as JSON output. with open('./tumblr_stats.json', 'w') as f: json.dump(asdict(stats_model), f, indent=1, default=str) # If there were original posts, create a CSV for them. if post_map: with open('./tumblr_original_posts.csv', 'w', newline='') as f: post_list: List[Dict[str, Any]] = list(post_map.values()) wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore', fieldnames=post_list[0].keys()) wr.writeheader() wr.writerows(post_list) else: print('No original posts were found, so a CSV of original posts was not written.') return # DO NOT DELETE. The main if statement. if __name__ == '__main__': main() print('All done.') sys.exit(0)