tumblr-stats/tumblr_stats.py

239 lines
9.8 KiB
Python

import argparse
import csv
from dataclasses import asdict
from datetime import datetime
import json
import os
from pathlib import Path
import sys
from typing import Any, Callable, Dict, List, Tuple
import pytumblr
from build_tag_stats_model import BuildTagStatsModel
from build_total_stats_model import BuildTotalStatsModel
from build_queue_stats_model import BuildQueueStatsModel
from stats_model import StatsModel
def get_args() -> Dict[str, Any]:
"""Pull arguments from command line, turn them into a dictionary of <arg, value>"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
prog='tumblr_stats.py',
description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION',
choices=['build_tag_stats', 'build_queue_stats'],
help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats')
parser.add_argument('-t', '--tags', type=str, nargs='+',
help='tag(s) to focus on in status (if applicable)')
parser.add_argument('-i', '--input', type=str,
help='Don\'t make API calls, just use a JSON input file')
# TODO: Make 'before' work, but it depends on https://github.com/tumblr/pytumblr/issues/174.
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts before YYYY-MM-DD')
parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
help='only gather posts after YYYY-MM-DD')
return vars(parser.parse_args())
def init_client() -> pytumblr.TumblrRestClient:
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
missing_vars: List[str] = [name for name,
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
('$TUMBLR_OAUTH_TOKEN', oauth_token),
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
if missing_vars:
print("Missing important environment variables:", missing_vars)
sys.exit(1)
return pytumblr.TumblrRestClient(
consumer_key=consumer_key, # type: ignore
consumer_secret=consumer_secret, # type: ignore
oauth_token=oauth_token, # type: ignore
oauth_secret=oauth_secret, # type: ignore
)
def filter_posts_for_after(post_list: List[Dict[str, Any]],
after: datetime) -> List[Dict[str, Any]]:
# Quick short circuit check.
if not post_list or not after:
return []
# Handle 'after'.
after_check: Callable[[Dict[str, Any]], bool] = lambda x: datetime.strptime(
x['date'], '%Y-%m-%d %H:%M:%S %Z') > after
return [post for post in post_list if after_check(post)]
def build_post_maps(client: pytumblr.TumblrRestClient,
args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
og_post_map: Dict[str, Any] = {}
un_og_post_map: Dict[str, Any] = {}
blog_name: str = args['blog']
# We populate params, starting with any tags for filtering.
params = {}
if args['tags']:
params.update({'tag': ','.join(args['tags'])})
# TODO: Make 'before' work.
# if args['before']:
# before: datetime = args['before']
# params.update({'before': int(before.timestamp())})
total: int = 0
offset: int = 0
limit: int = 20
# The request loop that pulls all data from the APIs.
while True:
# Begin LOOP
# Get me some posts via REST! 😈🍪🍪🍪
data: Dict[str, Any]
if 'build_queue_stats' in args['operation'] and len(args['operation']) == 1:
data = client.queue(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
else: # Above is for queued posts, below is for published posts.
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
# Stop the presses if we found no posts.
curr_posts: List[Dict[str, Any]] = data['posts']
if not curr_posts or len(curr_posts) < 1:
print('Stopping, as no more posts were found.')
break
next_off: int = 0
if '_links' in data:
links = data['_links']
if 'next' in links and 'query_params' in links['next']:
next_off = int(links['next']['query_params']['offset'])
# Total init check for the first iteration, but always checked for sanity.
if not total and 'total_posts' in data:
total_posts = data['total_posts']
print(f"I'm working with {total_posts} total posts...")
total = total_posts
# Behavior for 'after'.
if args['after']:
after: datetime = args['after']
curr_posts = filter_posts_for_after(curr_posts, after)
if not curr_posts:
print(f"All posts after {after.year}-{after.month} processed.")
return (og_post_map, un_og_post_map)
# This block populates the local post_maps from the raw response data.
local_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' not in item
}
local_un_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' in item
}
# Update the maps with what we found.
og_post_map.update(local_og_post_map)
un_og_post_map.update(local_un_og_post_map)
# The increment and status printing.
if next_off != 0 and next_off != offset:
offset = next_off
else:
offset += limit
if not args['after'] and total:
print(
f"Processed batch {offset // limit} of {(total // 20) + 1}...")
# End LOOP
# Return (og_posts, not_og_posts).
return (og_post_map, un_og_post_map)
def main() -> None:
args: Dict[str, Any] = get_args()
client: pytumblr.TumblrRestClient = init_client()
# Handle JSON input (if you don't want to make API calls.)
if 'input' in args and args['input']:
input_path = Path(args['input'])
with open(input_path, "r") as f:
data = json.load(f)
og_post_map = data['original_post_map']
un_og_post_map = data['unoriginal_post_map']
for post_key in og_post_map.copy():
post = og_post_map[post_key]
date: datetime = datetime.strptime(
post['date'], '%Y-%m-%d %H:%M:%S %Z')
if date.year != 2025:
del og_post_map[post_key]
for post_key in un_og_post_map.copy():
post = un_og_post_map[post_key]
date: datetime = datetime.strptime(
post['date'], '%Y-%m-%d %H:%M:%S %Z')
if date.year != 2025:
del un_og_post_map[post_key]
else:
# Get the og_post_map (original posts) and un_og_post_map (not original posts).
og_post_map, un_og_post_map = build_post_maps(args=args, client=client)
# Pick a stats model, which will determine output.
stats_model: StatsModel
if 'build_queue_stats' in args['operation']:
if len(args['operation']) != 1:
print('You can\'t mix build_queue_stats with other operations. Sorry.')
sys.exit(1)
stats_model = BuildQueueStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
if 'build_tag_stats' in args['operation']:
stats_model = BuildTagStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
stats_model.tags = args['tags']
if 'build_total_stats' in args['operation']:
if 'before' not in args: # or 'after' not in args:
print('You must specify a time range for build_total stats. ' +
'You\'ll otherwise request TOO MUCH DATA!')
sys.exit(1)
stats_model = BuildTotalStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
# Write the chosen model as JSON output.
with open('./tumblr_stats.json', 'w') as f:
json.dump(asdict(stats_model), f, indent=1, default=str)
# If there were original posts, create a CSV for them.
if og_post_map:
with open('./tumblr_original_posts.csv', 'w', newline='') as f:
post_list: List[Dict[str, Any]] = list(og_post_map.values())
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
fieldnames=post_list[0].keys())
wr.writeheader()
wr.writerows(post_list)
else:
print('No original posts were found, so a CSV of original posts was not written.')
return
# DO NOT DELETE. The main if statement.
if __name__ == '__main__':
main()
print('All done.')
sys.exit(0)