Files
tumblr-stats/tumblr_stats.py

265 lines
11 KiB
Python

import argparse
import csv
from dataclasses import asdict
from datetime import datetime
import json
import os
from pathlib import Path
import sys
from typing import Any, Callable, Dict, List, Tuple
import pytumblr
from build_draft_stats_model import BuildDraftStatsModel
from build_tag_stats_model import BuildTagStatsModel
from build_total_stats_model import BuildTotalStatsModel
from build_queue_stats_model import BuildQueueStatsModel
from stats_model import StatsModel
def get_args() -> Dict[str, Any]:
"""Pull arguments from command line, turn them into a dictionary of <arg, value>"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
prog='tumblr_stats.py',
description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION',
choices=['build_tag_stats', 'build_queue_stats', 'build_draft_stats'],
help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats')
parser.add_argument('-t', '--tags', type=str, nargs='+',
help='tag(s) to focus on in status (if applicable)')
parser.add_argument('-i', '--input', type=str,
help='Don\'t make API calls, just use a JSON input file')
# TODO: Make 'before' work, but it depends on https://github.com/tumblr/pytumblr/issues/174.
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts before YYYY-MM-DD')
parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
help='only gather posts after YYYY-MM-DD')
return vars(parser.parse_args())
def init_client() -> pytumblr.TumblrRestClient:
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
missing_vars: List[str] = [name for name,
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
('$TUMBLR_OAUTH_TOKEN', oauth_token),
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
if missing_vars:
print("Missing important environment variables:", missing_vars)
sys.exit(1)
return pytumblr.TumblrRestClient(
consumer_key=consumer_key, # type: ignore
consumer_secret=consumer_secret, # type: ignore
oauth_token=oauth_token, # type: ignore
oauth_secret=oauth_secret, # type: ignore
)
def filter_posts_for_after(post_list: List[Dict[str, Any]],
after: datetime) -> List[Dict[str, Any]]:
# Quick short circuit check.
if not post_list or not after:
return []
# Handle 'after'.
after_check: Callable[[Dict[str, Any]], bool] = lambda x: datetime.strptime(
x['date'], '%Y-%m-%d %H:%M:%S %Z') > after
return [post for post in post_list if after_check(post)]
def build_post_maps(client: pytumblr.TumblrRestClient,
args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
og_post_map: Dict[str, Any] = {}
un_og_post_map: Dict[str, Any] = {}
blog_name: str = args['blog']
# We populate params, starting with any tags for filtering.
params = {}
if args['tags']:
params.update({'tag': ','.join(args['tags'])})
# TODO: Make 'before' work.
# if args['before']:
# before: datetime = args['before']
# params.update({'before': int(before.timestamp())})
draft_url = f"/v2/blog/{blog_name}/posts/draft"
is_draft_stats: bool = 'build_draft_stats' in args['operation']
total: int = 0
offset: int = 0
limit: int = 20
# The request loop that pulls all data from the APIs.
while True:
# Begin LOOP
# Get me some posts via REST! 😈🍪🍪🍪
data: Dict[str, Any]
if 'build_queue_stats' in args['operation']:
data = client.queue(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
elif is_draft_stats:
data = client.send_api_request("get", draft_url)
else: # Above is for queued + draft posts, below is for published posts.
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
# Stop the presses if we found no posts.
curr_posts: List[Dict[str, Any]] = data['posts']
if not curr_posts or len(curr_posts) < 1:
print('Stopping, as no more posts were found.')
break
next_off: int = 0
if '_links' in data and not is_draft_stats:
links = data['_links']
if 'next' in links and 'query_params' in links['next']:
next_off = int(links['next']['query_params']['offset'])
# Total init check for the first iteration, but always checked for sanity.
if not total and 'total_posts' in data:
total_posts = data['total_posts']
print(f"I'm working with {total_posts} total posts...")
total = total_posts
# Behavior for 'after'.
if args['after']:
after: datetime = args['after']
curr_posts = filter_posts_for_after(curr_posts, after)
if not curr_posts:
print(f"All posts after {after.year}-{after.month}-{after.day} processed.")
return (og_post_map, un_og_post_map)
# This block populates the local post_maps from the raw response data.
local_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' not in item
}
local_un_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' in item
}
# Update the maps with what we found.
og_post_map.update(local_og_post_map)
un_og_post_map.update(local_un_og_post_map)
# For build_draft_stats.
if is_draft_stats:
if '_links' in data:
draft_url = data['_links']['next']['href']
continue
else:
print('All draft posts processed.')
break
# The increment and status printing.
if next_off != 0 and next_off != offset:
offset = next_off
else:
offset += limit
if not args['after'] and total:
print(
f"Processed batch {offset // limit} of {(total // 20) + 1}...")
# End LOOP
# Return (og_posts, not_og_posts).
return (og_post_map, un_og_post_map)
def main() -> None:
args: Dict[str, Any] = get_args()
client: pytumblr.TumblrRestClient = init_client()
operation: List[str] = args['operation']
# Quick bail for bad use of build_queue_stats and build_draft_stats.
if (set(operation) & set(['build_queue_stats', 'build_draft_stats'])) and len(operation) > 1:
print(f"You can't mix operations {operation} together. Sorry.")
sys.exit(1)
pass
# Handle JSON input (if you don't want to make API calls.)
if 'input' in args and args['input']:
input_path = Path(args['input'])
with open(input_path, "r") as f:
data = json.load(f)
og_post_map = data['original_post_map']
un_og_post_map = data['unoriginal_post_map']
for post_key in og_post_map.copy():
post = og_post_map[post_key]
date: datetime = datetime.strptime(
post['date'], '%Y-%m-%d %H:%M:%S %Z')
if date.year != 2025:
del og_post_map[post_key]
for post_key in un_og_post_map.copy():
post = un_og_post_map[post_key]
date: datetime = datetime.strptime(
post['date'], '%Y-%m-%d %H:%M:%S %Z')
if date.year != 2025:
del un_og_post_map[post_key]
else:
# Get the og_post_map (original posts) and un_og_post_map (not original posts).
og_post_map, un_og_post_map = build_post_maps(args=args, client=client)
# Pick a stats model, which will determine output.
stats_model: StatsModel
match args:
case {'operation': ['build_queue_stats']}:
stats_model = BuildQueueStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
case {'operation': ['build_draft_stats']}:
stats_model = BuildDraftStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
case {'operation': op} if 'build_tag_stats' in operation:
stats_model = BuildTagStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
stats_model.tags = args['tags']
case {'operation': op} if 'build_total_stats' in operation:
if 'before' not in args: # or 'after' not in args:
print(f"You must specify a time range for {op}. " +
'You\'ll otherwise request TOO MUCH DATA!')
sys.exit(1)
stats_model = BuildTotalStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
case _:
print('Unsupported command. How did you even make it this far?!')
sys.exit(1)
# Write the selected model as JSON output.
with open('./tumblr_stats.json', 'w') as f:
json.dump(asdict(stats_model), f, indent=1, default=str)
# If there were original posts, create a CSV for them.
if og_post_map:
with open('./tumblr_original_posts.csv', 'w', newline='') as f:
post_list: List[Dict[str, Any]] = list(og_post_map.values())
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
fieldnames=post_list[0].keys())
wr.writeheader()
wr.writerows(post_list)
else:
print('No original posts were found, so a CSV of original posts was not written.')
return
# DO NOT DELETE. The main if statement.
if __name__ == '__main__':
main()
print('All done.')
sys.exit(0)