tumblr-stats/tumblr_stats.py

175 lines
6.9 KiB
Python

import argparse
import csv
from dataclasses import asdict
import json
import os
import sys
from typing import Any, Dict, List, Tuple
import pytumblr
from build_tag_stats_model import BuildTagStatsModel
from build_total_stats_model import BuildTotalStatsModel
from stats_model import StatsModel
def get_args() -> Dict[str, Any]:
"""Pull arguments from command line, turn them into a dictionary of <arg, value>"""
parser = argparse.ArgumentParser(
prog='tumblr_stats.py',
description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, nargs='+',
metavar='OPERATION', choices=['build_tag_stats'],
help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats')
parser.add_argument('-t', '--tags', type=str, nargs='+',
help='tag(s) to focus on in status (if applicable)')
# TODO: Make 'before' work, but it actually depends on https://github.com/tumblr/pytumblr/issues/174.
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts before YYYY-MM-DD')
# TODO: Make 'after' work if they add it to pytumblr.
# parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts after YYYY-MM-DD')
return vars(parser.parse_args())
def init_client() -> pytumblr.TumblrRestClient:
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
missing_vars: List[str] = [name for name,
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
('$TUMBLR_OAUTH_TOKEN', oauth_token),
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
if missing_vars:
print("Missing important environment variables:", missing_vars)
sys.exit(1)
return pytumblr.TumblrRestClient(
consumer_key=consumer_key, # type: ignore
consumer_secret=consumer_secret, # type: ignore
oauth_token=oauth_token, # type: ignore
oauth_secret=oauth_secret, # type: ignore
)
def build_post_maps(client: pytumblr.TumblrRestClient,
args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
og_post_map: Dict[str, Any] = {}
un_og_post_map: Dict[str, Any] = {}
blog_name: str = args['blog']
# We populate params, starting with any tags for filtering.
params = {}
if args['tags']:
params.update({'tag': ','.join(args['tags'])})
# TODO: Make 'before' work.
# if args['before']:
# before: datetime = args['before']
# params.update({'before': int(before.timestamp())})
# TODO: Make 'after' work.
# if args['after']:
# after: datetime = args['after']
# params.update({'after': str(int(after.timestamp()))})
total: int = 0
offset: int = 0
limit: int = 20
# The request loop that pulls all data from the APIs.
while offset <= total:
# Begin LOOP
# Get me some posts via REST! 😈🍪🍪🍪
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
# Stop the presses if we found no posts.
if not data['posts']:
print('Stopping, as no posts were found.')
break
# Total init check for the first iteration, but always checked for sanity.
if not total:
# Let's see what's in there,
total_posts = data['total_posts']
print(f"I'm working with {total_posts} total posts...")
total = total_posts
# This block populates the local post_maps from the raw response data.
curr_posts: List[Dict[str, Any]] = data['posts']
local_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' not in item
}
local_un_og_post_map: Dict[str, Any] = {
item['id_string']: item for item in curr_posts if 'parent_post_url' in item
}
# Update the maps with what we found.
og_post_map.update(local_og_post_map)
un_og_post_map.update(local_un_og_post_map)
# The increment and status printing. Should always end the loop!
offset += limit
print(f"Processed batch {offset // limit} of {(total // 20) + 1}...")
# End LOOP
# Return (og_posts, not_og_posts).
return (og_post_map, un_og_post_map)
def main() -> None:
args = get_args()
client = init_client()
# Get the og_post_map (original posts) and un_og_post_map (not original posts).
og_post_map, un_og_post_map = build_post_maps(args=args, client=client)
# Pick a stats model, which will determine output.
stats_model: StatsModel
if 'build_tag_stats' in args['operation']:
stats_model = BuildTagStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
stats_model.tags = args['tags']
if 'build_total_stats' in args['operation']:
if 'before' not in args: # or 'after' not in args:
print('You must specify a time range for build_total stats. ' +
'You\'ll otherwise request TOO MUCH DATA!')
sys.exit(1)
stats_model = BuildTotalStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
# Write the chosen model as JSON output.
with open('./tumblr_stats.json', 'w') as f:
json.dump(asdict(stats_model), f, indent=1, default=str)
# If there were original posts, create a CSV for them.
if og_post_map:
with open('./tumblr_original_posts.csv', 'w', newline='') as f:
post_list: List[Dict[str, Any]] = list(og_post_map.values())
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
fieldnames=post_list[0].keys())
wr.writeheader()
wr.writerows(post_list)
else:
print('No original posts were found, so a CSV of original posts was not written.')
return
# DO NOT DELETE. The main if statement.
if __name__ == '__main__':
main()
print('All done.')
sys.exit(0)