import argparse import csv from dataclasses import asdict from datetime import datetime import json import os from pathlib import Path import sys from typing import Any, Callable, Dict, List, Tuple import pytumblr from build_tag_stats_model import BuildTagStatsModel from build_total_stats_model import BuildTotalStatsModel from stats_model import StatsModel def get_args() -> Dict[str, Any]: """Pull arguments from command line, turn them into a dictionary of """ parser: argparse.ArgumentParser = argparse.ArgumentParser( prog='tumblr_stats.py', description='Use pytumblr to calculate stats after setting these enviroment variables: ' + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', epilog='— Be gay and do crime') parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION', choices=['build_tag_stats'], help="operation used to calculate stats") parser.add_argument('-b', '--blog', type=str, required=True, help='blog name for which to calculate stats') parser.add_argument('-t', '--tags', type=str, nargs='+', help='tag(s) to focus on in status (if applicable)') parser.add_argument('-i', '--input', type=str, help='Don\'t make API calls, just use a JSON input file') # TODO: Make 'before' work, but it depends on https://github.com/tumblr/pytumblr/issues/174. # parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # help='only gather posts before YYYY-MM-DD') parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), help='only gather posts after YYYY-MM-DD') return vars(parser.parse_args()) def init_client() -> pytumblr.TumblrRestClient: consumer_key = os.getenv('TUMBLR_CONSUMER_KEY') consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET') oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN') oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET') missing_vars: List[str] = [name for name, val in [('$TUMBLR_CONSUMER_KEY', consumer_key), ('$TUMBLR_CONSUMER_SECRET', consumer_secret), ('$TUMBLR_OAUTH_TOKEN', oauth_token), ('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None] if missing_vars: print("Missing important environment variables:", missing_vars) sys.exit(1) return pytumblr.TumblrRestClient( consumer_key=consumer_key, # type: ignore consumer_secret=consumer_secret, # type: ignore oauth_token=oauth_token, # type: ignore oauth_secret=oauth_secret, # type: ignore ) def filter_posts_for_after(post_list: List[Dict[str, Any]], after: datetime) -> List[Dict[str, Any]]: # Quick short circuit check. if not post_list or not after: return [] # Handle 'after'. after_check: Callable[[Dict[str, Any]], bool] = lambda x: datetime.strptime( x['date'], '%Y-%m-%d %H:%M:%S %Z') > after return [post for post in post_list if after_check(post)] def build_post_maps(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]: og_post_map: Dict[str, Any] = {} un_og_post_map: Dict[str, Any] = {} blog_name: str = args['blog'] # We populate params, starting with any tags for filtering. params = {} if args['tags']: params.update({'tag': ','.join(args['tags'])}) # TODO: Make 'before' work. # if args['before']: # before: datetime = args['before'] # params.update({'before': int(before.timestamp())}) total: int = 0 offset: int = 0 limit: int = 20 # The request loop that pulls all data from the APIs. while offset <= total: # Begin LOOP # Get me some posts via REST! 😈🍪🍪🍪 data = client.posts(f"{blog_name}.tumblr.com", offset=offset, limit=limit, **params) # Stop the presses if we found no posts. curr_posts: List[Dict[str, Any]] = data['posts'] if not curr_posts or len(curr_posts) < 1: print('Stopping, as no posts were found.') break # Total init check for the first iteration, but always checked for sanity. if not total: # Let's see what's in there, total_posts = data['total_posts'] print(f"I'm working with {total_posts} total posts...") total = total_posts # Behavior for 'after'. if args['after']: after: datetime = args['after'] curr_posts = filter_posts_for_after(curr_posts, after) if not curr_posts: print(f"All posts after {after.year}-{after.month} processed.") return (og_post_map, un_og_post_map) # This block populates the local post_maps from the raw response data. local_og_post_map: Dict[str, Any] = { item['id_string']: item for item in curr_posts if 'parent_post_url' not in item } local_un_og_post_map: Dict[str, Any] = { item['id_string']: item for item in curr_posts if 'parent_post_url' in item } # Update the maps with what we found. og_post_map.update(local_og_post_map) un_og_post_map.update(local_un_og_post_map) # The increment and status printing. Should always end the loop! offset += limit if not args['after']: print( f"Processed batch {offset // limit} of {(total // 20) + 1}...") # End LOOP # Return (og_posts, not_og_posts). return (og_post_map, un_og_post_map) def main() -> None: args: Dict[str, Any] = get_args() client: pytumblr.TumblrRestClient = init_client() # Handle JSON input (if you don't want to make API calls.) if 'input' in args and args['input']: input_path = Path(args['input']) with open(input_path, "r") as f: data = json.load(f) og_post_map = data['original_post_map'] un_og_post_map = data['unoriginal_post_map'] for post_key in og_post_map.copy(): post = og_post_map[post_key] date: datetime = datetime.strptime( post['date'], '%Y-%m-%d %H:%M:%S %Z') if date.year != 2025: del og_post_map[post_key] for post_key in un_og_post_map.copy(): post = un_og_post_map[post_key] date: datetime = datetime.strptime( post['date'], '%Y-%m-%d %H:%M:%S %Z') if date.year != 2025: del un_og_post_map[post_key] else: # Get the og_post_map (original posts) and un_og_post_map (not original posts). og_post_map, un_og_post_map = build_post_maps(args=args, client=client) # Pick a stats model, which will determine output. stats_model: StatsModel if 'build_tag_stats' in args['operation']: stats_model = BuildTagStatsModel(blog_name=args['blog'], original_post_map=og_post_map, unoriginal_post_map=un_og_post_map) stats_model.tags = args['tags'] if 'build_total_stats' in args['operation']: if 'before' not in args: # or 'after' not in args: print('You must specify a time range for build_total stats. ' + 'You\'ll otherwise request TOO MUCH DATA!') sys.exit(1) stats_model = BuildTotalStatsModel(blog_name=args['blog'], original_post_map=og_post_map, unoriginal_post_map=un_og_post_map) # Write the chosen model as JSON output. with open('./tumblr_stats.json', 'w') as f: json.dump(asdict(stats_model), f, indent=1, default=str) # If there were original posts, create a CSV for them. if og_post_map: with open('./tumblr_original_posts.csv', 'w', newline='') as f: post_list: List[Dict[str, Any]] = list(og_post_map.values()) wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore', fieldnames=post_list[0].keys()) wr.writeheader() wr.writerows(post_list) else: print('No original posts were found, so a CSV of original posts was not written.') return # DO NOT DELETE. The main if statement. if __name__ == '__main__': main() print('All done.') sys.exit(0)