239 lines
9.8 KiB
Python
239 lines
9.8 KiB
Python
|
|
import argparse
|
|
import csv
|
|
from dataclasses import asdict
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Any, Callable, Dict, List, Tuple
|
|
|
|
import pytumblr
|
|
|
|
from build_tag_stats_model import BuildTagStatsModel
|
|
from build_total_stats_model import BuildTotalStatsModel
|
|
from build_queue_stats_model import BuildQueueStatsModel
|
|
from stats_model import StatsModel
|
|
|
|
|
|
def get_args() -> Dict[str, Any]:
|
|
"""Pull arguments from command line, turn them into a dictionary of <arg, value>"""
|
|
parser: argparse.ArgumentParser = argparse.ArgumentParser(
|
|
prog='tumblr_stats.py',
|
|
description='Use pytumblr to calculate stats after setting these enviroment variables: '
|
|
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
|
|
epilog='— Be gay and do crime')
|
|
parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION',
|
|
choices=['build_tag_stats', 'build_queue_stats'],
|
|
help="operation used to calculate stats")
|
|
parser.add_argument('-b', '--blog', type=str, required=True,
|
|
help='blog name for which to calculate stats')
|
|
parser.add_argument('-t', '--tags', type=str, nargs='+',
|
|
help='tag(s) to focus on in status (if applicable)')
|
|
parser.add_argument('-i', '--input', type=str,
|
|
help='Don\'t make API calls, just use a JSON input file')
|
|
# TODO: Make 'before' work, but it depends on https://github.com/tumblr/pytumblr/issues/174.
|
|
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
|
|
# help='only gather posts before YYYY-MM-DD')
|
|
parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
|
|
help='only gather posts after YYYY-MM-DD')
|
|
return vars(parser.parse_args())
|
|
|
|
|
|
def init_client() -> pytumblr.TumblrRestClient:
|
|
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
|
|
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
|
|
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
|
|
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
|
|
|
|
missing_vars: List[str] = [name for name,
|
|
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
|
|
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
|
|
('$TUMBLR_OAUTH_TOKEN', oauth_token),
|
|
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
|
|
|
|
if missing_vars:
|
|
print("Missing important environment variables:", missing_vars)
|
|
sys.exit(1)
|
|
|
|
return pytumblr.TumblrRestClient(
|
|
consumer_key=consumer_key, # type: ignore
|
|
consumer_secret=consumer_secret, # type: ignore
|
|
oauth_token=oauth_token, # type: ignore
|
|
oauth_secret=oauth_secret, # type: ignore
|
|
)
|
|
|
|
|
|
def filter_posts_for_after(post_list: List[Dict[str, Any]],
|
|
after: datetime) -> List[Dict[str, Any]]:
|
|
# Quick short circuit check.
|
|
if not post_list or not after:
|
|
return []
|
|
|
|
# Handle 'after'.
|
|
after_check: Callable[[Dict[str, Any]], bool] = lambda x: datetime.strptime(
|
|
x['date'], '%Y-%m-%d %H:%M:%S %Z') > after
|
|
return [post for post in post_list if after_check(post)]
|
|
|
|
|
|
def build_post_maps(client: pytumblr.TumblrRestClient,
|
|
args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
og_post_map: Dict[str, Any] = {}
|
|
un_og_post_map: Dict[str, Any] = {}
|
|
blog_name: str = args['blog']
|
|
|
|
# We populate params, starting with any tags for filtering.
|
|
params = {}
|
|
if args['tags']:
|
|
params.update({'tag': ','.join(args['tags'])})
|
|
# TODO: Make 'before' work.
|
|
# if args['before']:
|
|
# before: datetime = args['before']
|
|
# params.update({'before': int(before.timestamp())})
|
|
|
|
total: int = 0
|
|
offset: int = 0
|
|
limit: int = 20
|
|
|
|
# The request loop that pulls all data from the APIs.
|
|
while True:
|
|
# Begin LOOP
|
|
# Get me some posts via REST! 😈🍪🍪🍪
|
|
data: Dict[str, Any]
|
|
if 'build_queue_stats' in args['operation'] and len(args['operation']) == 1:
|
|
data = client.queue(f"{blog_name}.tumblr.com",
|
|
offset=offset,
|
|
limit=limit,
|
|
**params)
|
|
else: # Above is for queued posts, below is for published posts.
|
|
data = client.posts(f"{blog_name}.tumblr.com",
|
|
offset=offset,
|
|
limit=limit,
|
|
**params)
|
|
|
|
# Stop the presses if we found no posts.
|
|
curr_posts: List[Dict[str, Any]] = data['posts']
|
|
if not curr_posts or len(curr_posts) < 1:
|
|
print('Stopping, as no more posts were found.')
|
|
break
|
|
|
|
next_off: int = 0
|
|
if '_links' in data:
|
|
links = data['_links']
|
|
if 'next' in links and 'query_params' in links['next']:
|
|
next_off = int(links['next']['query_params']['offset'])
|
|
|
|
# Total init check for the first iteration, but always checked for sanity.
|
|
if not total and 'total_posts' in data:
|
|
total_posts = data['total_posts']
|
|
print(f"I'm working with {total_posts} total posts...")
|
|
total = total_posts
|
|
|
|
# Behavior for 'after'.
|
|
if args['after']:
|
|
after: datetime = args['after']
|
|
curr_posts = filter_posts_for_after(curr_posts, after)
|
|
if not curr_posts:
|
|
print(f"All posts after {after.year}-{after.month} processed.")
|
|
return (og_post_map, un_og_post_map)
|
|
|
|
# This block populates the local post_maps from the raw response data.
|
|
local_og_post_map: Dict[str, Any] = {
|
|
item['id_string']: item for item in curr_posts if 'parent_post_url' not in item
|
|
}
|
|
local_un_og_post_map: Dict[str, Any] = {
|
|
item['id_string']: item for item in curr_posts if 'parent_post_url' in item
|
|
}
|
|
|
|
# Update the maps with what we found.
|
|
og_post_map.update(local_og_post_map)
|
|
un_og_post_map.update(local_un_og_post_map)
|
|
|
|
# The increment and status printing.
|
|
if next_off != 0 and next_off != offset:
|
|
offset = next_off
|
|
else:
|
|
offset += limit
|
|
if not args['after'] and total:
|
|
print(
|
|
f"Processed batch {offset // limit} of {(total // 20) + 1}...")
|
|
# End LOOP
|
|
|
|
# Return (og_posts, not_og_posts).
|
|
return (og_post_map, un_og_post_map)
|
|
|
|
|
|
def main() -> None:
|
|
args: Dict[str, Any] = get_args()
|
|
client: pytumblr.TumblrRestClient = init_client()
|
|
|
|
# Handle JSON input (if you don't want to make API calls.)
|
|
if 'input' in args and args['input']:
|
|
input_path = Path(args['input'])
|
|
with open(input_path, "r") as f:
|
|
data = json.load(f)
|
|
og_post_map = data['original_post_map']
|
|
un_og_post_map = data['unoriginal_post_map']
|
|
for post_key in og_post_map.copy():
|
|
post = og_post_map[post_key]
|
|
date: datetime = datetime.strptime(
|
|
post['date'], '%Y-%m-%d %H:%M:%S %Z')
|
|
if date.year != 2025:
|
|
del og_post_map[post_key]
|
|
for post_key in un_og_post_map.copy():
|
|
post = un_og_post_map[post_key]
|
|
date: datetime = datetime.strptime(
|
|
post['date'], '%Y-%m-%d %H:%M:%S %Z')
|
|
if date.year != 2025:
|
|
del un_og_post_map[post_key]
|
|
else:
|
|
# Get the og_post_map (original posts) and un_og_post_map (not original posts).
|
|
og_post_map, un_og_post_map = build_post_maps(args=args, client=client)
|
|
|
|
# Pick a stats model, which will determine output.
|
|
stats_model: StatsModel
|
|
if 'build_queue_stats' in args['operation']:
|
|
if len(args['operation']) != 1:
|
|
print('You can\'t mix build_queue_stats with other operations. Sorry.')
|
|
sys.exit(1)
|
|
stats_model = BuildQueueStatsModel(blog_name=args['blog'],
|
|
original_post_map=og_post_map,
|
|
unoriginal_post_map=un_og_post_map)
|
|
if 'build_tag_stats' in args['operation']:
|
|
stats_model = BuildTagStatsModel(blog_name=args['blog'],
|
|
original_post_map=og_post_map,
|
|
unoriginal_post_map=un_og_post_map)
|
|
stats_model.tags = args['tags']
|
|
if 'build_total_stats' in args['operation']:
|
|
if 'before' not in args: # or 'after' not in args:
|
|
print('You must specify a time range for build_total stats. ' +
|
|
'You\'ll otherwise request TOO MUCH DATA!')
|
|
sys.exit(1)
|
|
stats_model = BuildTotalStatsModel(blog_name=args['blog'],
|
|
original_post_map=og_post_map,
|
|
unoriginal_post_map=un_og_post_map)
|
|
|
|
# Write the chosen model as JSON output.
|
|
with open('./tumblr_stats.json', 'w') as f:
|
|
json.dump(asdict(stats_model), f, indent=1, default=str)
|
|
|
|
# If there were original posts, create a CSV for them.
|
|
if og_post_map:
|
|
with open('./tumblr_original_posts.csv', 'w', newline='') as f:
|
|
post_list: List[Dict[str, Any]] = list(og_post_map.values())
|
|
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
|
|
fieldnames=post_list[0].keys())
|
|
wr.writeheader()
|
|
wr.writerows(post_list)
|
|
else:
|
|
print('No original posts were found, so a CSV of original posts was not written.')
|
|
return
|
|
|
|
|
|
# DO NOT DELETE. The main if statement.
|
|
if __name__ == '__main__':
|
|
main()
|
|
print('All done.')
|
|
sys.exit(0)
|