Added a popular tags feature, greatly cleaned up code, commented spots

This commit is contained in:
Amber McCloughan 2025-12-30 18:40:24 -05:00
parent 4fbb99a3f6
commit 9a8144af68
4 changed files with 83 additions and 58 deletions

View File

@ -1,10 +1,9 @@
# tumblr-stats # tumblr-stats
## Usage ## Usage
``` ```
usage: tumblr_stats.py [-h] -b BLOG [-t TAGS [TAGS ...]] OPERATION usage: tumblr_stats.py [-h] -b BLOG [-t TAGS [TAGS ...]] OPERATION [OPERATION ...]
Use pytumblr to calculate stats after setting these enviroment variables: $TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, Use pytumblr to calculate stats after setting these enviroment variables: $TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET
$TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET
positional arguments: positional arguments:
OPERATION operation used to calculate stats OPERATION operation used to calculate stats

View File

@ -16,26 +16,5 @@ class BuildTotalStatsModel(StatsModel):
# Posts ranked from most popular to least popular by notes within each month and year. # Posts ranked from most popular to least popular by notes within each month and year.
top_post_urls_by_month_and_year: Dict[str, List[str]] = field(init=False) top_post_urls_by_month_and_year: Dict[str, List[str]] = field(init=False)
# Tags ranked from most popular to least popular by notes.
most_popular_tags: List[Dict[str, Any]] = field(default_factory=list)
def __post_init__(self): def __post_init__(self):
super().__post_init__() super().__post_init__()
self.most_popular_tags = self.determine_most_popular_tags()
def determine_most_popular_tags(self) -> List[Dict[str, Any]]:
tag_dict: Dict[str, Any] = {}
for post_key in self.original_post_map:
post = self.original_post_map[post_key]
tags = post['tags']
for tag in tags:
if tag in tag_dict:
tag_dict[tag] = {
'tag': tag, 'note_count': tag_dict[tag] + post['note_count']}
else:
tag_dict[tag] = {'tag': tag,
'note_count': post['note_count']}
tag_list = sorted(list(tag_dict.values()),
key=itemgetter('note_count'), reverse=True)
return tag_list

View File

@ -1,5 +1,7 @@
from collections import defaultdict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from operator import itemgetter
from typing import Any, Dict, List from typing import Any, Dict, List
@ -34,11 +36,15 @@ class StatsModel:
total_original_post_notes_by_month_and_year: Dict[str, int] = field( total_original_post_notes_by_month_and_year: Dict[str, int] = field(
init=False) init=False)
# Tags ranked from most popular to least popular by notes.
most_popular_tags: List[Dict[str, Any]] = field(init=False)
def __post_init__(self): def __post_init__(self):
self.total_posts = self.calculate_total_posts() self.total_posts = self.calculate_total_posts()
self.total_original_posts = self.calculate_total_original_posts() self.total_original_posts = self.calculate_total_original_posts()
self.total_original_post_notes = self.calculate_total_original_post_notes() self.total_original_post_notes = self.calculate_total_original_post_notes()
self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year() self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year()
self.most_popular_tags = self.determine_most_popular_tags()
def calculate_total_posts(self) -> int: def calculate_total_posts(self) -> int:
return len(self.original_post_map) + len(self.unoriginal_post_map) return len(self.original_post_map) + len(self.unoriginal_post_map)
@ -65,3 +71,27 @@ class StatsModel:
else: else:
date_map[post_date_key] = post['note_count'] date_map[post_date_key] = post['note_count']
return date_map return date_map
def determine_most_popular_tags(self) -> List[Dict[str, Any]]:
tag_dict: Dict[str, Any] = {}
tag_dict = defaultdict(lambda : {'note_count': 0,
'post_count': 0},
tag_dict)
for post_key in self.original_post_map:
post = self.original_post_map[post_key]
tags = post['tags']
for tag in tags:
sts = tag_dict[tag]
sts['tag'] = tag
sts['post_count'] += 1
sts['note_count'] += post['note_count']
for tag in tag_dict:
sts = tag_dict[tag]
post_count = sts['post_count']
note_count = sts['note_count']
sts['notes_to_posts_ratio'] = note_count / post_count
tag_list = sorted(list(tag_dict.values()), key=itemgetter('note_count'),
reverse=True)
return tag_list

View File

@ -2,7 +2,6 @@
import argparse import argparse
import csv import csv
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime
import json import json
import os import os
import sys import sys
@ -22,14 +21,17 @@ def get_args() -> Dict[str, Any]:
description='Use pytumblr to calculate stats after setting these enviroment variables: ' description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET', + '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime') epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, metavar='OPERATION', choices=['build_tag_stats'], parser.add_argument('operation', type=str, nargs = '+',
metavar='OPERATION', choices=['build_tag_stats'],
help="operation used to calculate stats") help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True, parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats') help='blog name for which to calculate stats')
parser.add_argument('-t', '--tags', type=str, nargs='+', parser.add_argument('-t', '--tags', type=str, nargs='+',
help='tag(s) to focus on in status (if applicable)') help='tag(s) to focus on in status (if applicable)')
# TODO: Make 'before' work, but it actually depends on https://github.com/tumblr/pytumblr/issues/174.
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts before YYYY-MM-DD') # help='only gather posts before YYYY-MM-DD')
# TODO: Make 'after' work if they add it to pytumblr.
# parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'), # parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts after YYYY-MM-DD') # help='only gather posts after YYYY-MM-DD')
return vars(parser.parse_args()) return vars(parser.parse_args())
@ -64,12 +66,15 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st
dumpster: Dict[str, Any] = {} dumpster: Dict[str, Any] = {}
blog_name = args['blog'] blog_name = args['blog']
# We populate params, starting with any tags for filtering.
params = {} params = {}
if args['tags']: if args['tags']:
params.update({'tag': ','.join(args['tags'])}) params.update({'tag': ','.join(args['tags'])})
# TODO: Make 'before' work.
# if args['before']: # if args['before']:
# before: datetime = args['before'] # before: datetime = args['before']
# params.update({'before': int(before.timestamp())}) # params.update({'before': int(before.timestamp())})
# TODO: Make 'after' work.
# if args['after']: # if args['after']:
# after: datetime = args['after'] # after: datetime = args['after']
# params.update({'after': str(int(after.timestamp()))}) # params.update({'after': str(int(after.timestamp()))})
@ -78,20 +83,21 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st
offset = 0 offset = 0
limit = 20 limit = 20
# The request loop that pulls all data from the APIs.
while offset <= total: while offset <= total:
# Begin LOOP # Begin LOOP
# Get me some posts! 😈🍪🍪🍪 # Get me some posts via REST! 😈🍪🍪🍪
data = client.posts(f"{blog_name}.tumblr.com", data = client.posts(f"{blog_name}.tumblr.com",
offset=offset, offset=offset,
limit=limit, limit=limit,
**params) **params)
# Sh**t it in the head if we found no posts. # Stop the presses if we found no posts.
if not data['posts']: if not data['posts']:
print('Stopping, as no posts were found.') print('Stopping, as no posts were found.')
break break
# Total check for the first good iteration, but always checked for sanity. # Total init check for the first iteration, but always checked for sanity.
if total == 0: if total == 0:
# Let's see what's in there, # Let's see what's in there,
total_posts = data['total_posts'] total_posts = data['total_posts']
@ -100,6 +106,7 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st
print(f"I'm working with {total_posts} total posts...") print(f"I'm working with {total_posts} total posts...")
total = total_posts total = total_posts
# This block populates the local post_map from the raw response data.
curr_posts = data['posts'] curr_posts = data['posts']
local_post_map: Dict[str, Any] = {} local_post_map: Dict[str, Any] = {}
for curr_post in curr_posts: for curr_post in curr_posts:
@ -107,47 +114,50 @@ def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[st
if curr_key not in local_post_map: if curr_key not in local_post_map:
local_post_map[curr_key] = curr_post local_post_map[curr_key] = curr_post
# This block populates the local dumpster from the raw response data.
local_dumpster = {} local_dumpster = {}
filtered_local_post_map = {} filtered_local_post_map = {}
for local_key in local_post_map: for local_key in local_post_map:
local_post = local_post_map[local_key] local_post = local_post_map[local_key]
# Determines whether this is an OG post.
if 'parent_post_url' not in local_post: if 'parent_post_url' not in local_post:
filtered_local_post_map[local_key] = local_post filtered_local_post_map[local_key] = local_post
else: else: # If it's not an OG post, into the local dumpster.
local_dumpster[local_key] = local_post local_dumpster[local_key] = local_post
# The sacred should we add, and if we should, DO ADD, if statement. # The sacred "should we add, and if we should, DO ADD" conditional statements.
has_og_posts = any(post not in post_map for post in filtered_local_post_map) has_og_posts = any(
post not in post_map for post in filtered_local_post_map)
has_not_og_posts = any(post not in dumpster for post in local_dumpster) has_not_og_posts = any(post not in dumpster for post in local_dumpster)
if has_og_posts: if has_og_posts:
post_map.update(filtered_local_post_map) post_map.update(filtered_local_post_map)
if has_not_og_posts: if has_not_og_posts:
dumpster.update(local_dumpster) dumpster.update(local_dumpster)
# The increment and status printing. Should always end the loop! # The increment and status printing. Should always end the loop!
offset += limit offset += limit
if offset == limit: print(f"Processed batch {offset // limit} of {(total // 20) + 1}...")
print('Processed first batch...')
elif offset < total:
print(f"Processed batch {offset // limit} of {total // 20}...")
else:
print(f"Processed all {total} posts")
# End LOOP # End LOOP
# Return (og_posts, not_og_posts).
return (post_map, dumpster) return (post_map, dumpster)
def build_tag_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTagStatsModel: def build_tag_stats_model(post_map: Dict[str, Any],
post_map, dumpster = build_post_map_and_dumpster(client, args) dumpster: Dict[str, Any],
stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], original_post_map=post_map, args: Dict[str, Any]) -> BuildTagStatsModel:
stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'],
original_post_map=post_map,
unoriginal_post_map=dumpster) unoriginal_post_map=dumpster)
stats_model.tags = args['tags'] stats_model.tags = args['tags']
return stats_model return stats_model
def build_total_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTotalStatsModel: def build_total_stats_model(post_map: Dict[str, Any],
post_map, dumpster = build_post_map_and_dumpster(client, args) dumpster: Dict[str, Any],
stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], original_post_map=post_map, args: Dict[str, Any]) -> BuildTotalStatsModel:
stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'],
original_post_map=post_map,
unoriginal_post_map=dumpster) unoriginal_post_map=dumpster)
return stats_model return stats_model
@ -156,32 +166,39 @@ def main() -> None:
args = get_args() args = get_args()
client = init_client() client = init_client()
stats_model = StatsModel(blog_name=args['blog'], operation='undefined', # Get the post_map (original posts) and dumpster (not original posts).
original_post_map={}, unoriginal_post_map={}) post_map, dumpster = build_post_map_and_dumpster(args=args, client=client)
if args['operation'] == 'build_tag_stats': # Pick a stats model, which will determine output.
stats_model = build_tag_stats_model(client, args) stats_model: StatsModel
elif args['operation'] == 'build_total_stats': if 'build_tag_stats' in args['operation']:
stats_model = build_tag_stats_model(post_map, dumpster, args)
if 'build_total_stats' in args['operation']:
if 'before' not in args: # or 'after' not in args: if 'before' not in args: # or 'after' not in args:
print('You must specify a time range for build_total stats. ' + print('You must specify a time range for build_total stats. ' +
'You\'ll otherwise request TOO MUCH DATA!') 'You\'ll otherwise request TOO MUCH DATA!')
sys.exit() sys.exit()
stats_model = build_total_stats_model(client, args) stats_model = build_total_stats_model(post_map, dumpster, args)
# Write the chosen model as JSON output.
with open('./tumblr_stats.json', 'w') as f: with open('./tumblr_stats.json', 'w') as f:
json.dump(asdict(stats_model), f, indent=2, sort_keys=True) json.dump(asdict(stats_model), f, indent=1)
if stats_model.original_post_map:
# If there were original posts, create a CSV for them.
if post_map:
with open('./tumblr_original_posts.csv', 'w', newline='') as f: with open('./tumblr_original_posts.csv', 'w', newline='') as f:
post_list: List[Dict[str, Any]] = list( post_list: List[Dict[str, Any]] = list(post_map.values())
stats_model.original_post_map.values())
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore', wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
fieldnames=post_list[0].keys()) fieldnames=post_list[0].keys())
wr.writeheader() wr.writeheader()
wr.writerows(post_list) wr.writerows(post_list)
else: else:
print('No original posts were found, so no CSV of original posts was written.') print('No original posts were found, so a CSV of original posts was not written.')
return return
# DO NOT DELETE. The main if statement.
if __name__ == '__main__': if __name__ == '__main__':
main() main()
print('All done.')
sys.exit(0) sys.exit(0)