Refactored to use specific classes for different operations

This commit is contained in:
Amber McCloughan 2025-12-30 00:18:44 -05:00
parent 8a38cb510b
commit 4fbb99a3f6
9 changed files with 341 additions and 201 deletions

19
README.md Normal file
View File

@ -0,0 +1,19 @@
# tumblr-stats
## Usage
```
usage: tumblr_stats.py [-h] -b BLOG [-t TAGS [TAGS ...]] OPERATION
Use pytumblr to calculate stats after setting these enviroment variables: $TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET,
$TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET
positional arguments:
OPERATION operation used to calculate stats
options:
-h, --help show this help message and exit
-b, --blog BLOG blog name for which to calculate stats
-t, --tags TAGS [TAGS ...]
tag(s) to focus on in status (if applicable)
— Be gay and do crime
```

135
app.py
View File

@ -1,135 +0,0 @@
from dataclasses import asdict
import json
from operator import itemgetter
import os
import sys
from typing import Any, Dict, List, Tuple
import pytumblr
from model import StatsModel
def init_client() -> pytumblr.TumblrRestClient:
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
missing_vars = [name for name,
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
('$TUMBLR_OAUTH_TOKEN', oauth_token),
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
if missing_vars:
print("Missing important environment variables:", missing_vars)
sys.exit(1)
return pytumblr.TumblrRestClient(
consumer_key=consumer_key, # type: ignore
consumer_secret=consumer_secret, # type: ignore
oauth_token=oauth_token, # type: ignore
oauth_secret=oauth_secret, # type: ignore
)
def calculate_total_notes(post_map: Dict[str, Any]) -> int:
total = 0
for post_key in post_map:
total += post_map[post_key]['note_count']
return total
def determine_top_post_urls(post_map: Dict[str, Any]) -> List[str]:
post_list = sorted(list(post_map.values()), key=itemgetter('note_count'), reverse=True)
return [post['post_url'] for post in post_list]
def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, blog_name: str, tag: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
post_map: Dict[str, Any] = {}
dumpster: Dict[str, Any] = {}
total = 0
offset = 0
limit = 20
while offset <= total:
### Begin LOOP
# Get me some posts! 😈🍪🍪🍪
data = client.posts(f"{blog_name}.tumblr.com", **{'tag': tag},
offset=offset,
limit=limit)
# Total check for the first iteration, but always checked for sanity.
if total == 0:
# Let's see what's in there,
total_posts = data['total_posts']
# Was nothing there?
if not total_posts:
print("Couldn't get total posts. We're outta here!")
sys.exit(1)
# Something was there, so we're good.
print(f"I'm working with {total_posts} total posts...")
total = total_posts
curr_posts = data['posts']
local_post_map: Dict[str, Any] = {}
for curr_post in curr_posts:
curr_key = curr_post['id_string']
if curr_key not in local_post_map:
local_post_map[curr_key] = curr_post
filtered_local_post_map = {}
for local_key in local_post_map:
local_post = local_post_map[local_key]
if 'parent_post_url' not in local_post:
filtered_local_post_map[local_key] = local_post
else:
dumpster[local_key] = local_post
# The sacred should we add, and if we should, DO ADD, if statement.
if any(post not in post_map for post in filtered_local_post_map):
post_map.update(filtered_local_post_map)
# The increment and status printing. Should always end the loop!
offset += limit
if offset < total:
print(f"Processed batch {offset // limit} of {total // 20}...")
else:
print(f"Processed all {total} posts")
### End LOOP
return (post_map, dumpster)
def build_tag_stats_model(client: pytumblr.TumblrRestClient, blog_name: str, tag: str) -> StatsModel:
post_map, dumpster = build_post_map_and_dumpster(client, blog_name, tag)
stats_model: StatsModel = StatsModel()
stats_model.operation = 'build_tag_stats'
stats_model.blog_name = blog_name
stats_model.post_map = post_map
stats_model.dumpster = dumpster
stats_model.total_posts = len(post_map) + len(dumpster)
stats_model.total_original_posts = len(post_map)
stats_model.total_original_post_notes = calculate_total_notes(post_map)
stats_model.ranked_post_urls = determine_top_post_urls(post_map)
return stats_model
def main() -> None:
client = init_client()
stats_model = build_tag_stats_model(client, 'panda-pal', 'inuyasha')
with open("./tumblr_data.json", "w") as f:
json.dump(asdict(stats_model), f, indent=2, sort_keys=True)
return
if __name__ == '__main__':
main()
sys.exit(0)

23
build_tag_stats_model.py Normal file
View File

@ -0,0 +1,23 @@
from dataclasses import dataclass, field
from operator import itemgetter
from typing import List
from stats_model import StatsModel
@dataclass(kw_only=True)
class BuildTagStatsModel(StatsModel):
"""Stats model built around calculating stats for posts containing one one or more specified tags."""
operation: str = 'build_tag_stats'
# Posts ranked from most popular to least popular by notes.
ranked_post_urls: List[str] = field(init=False)
def __post_init__(self):
super().__post_init__()
self.ranked_post_urls = self.determine_ranked_post_urls()
def determine_ranked_post_urls(self) -> List[str]:
post_list = sorted(list(self.original_post_map.values()),
key=itemgetter('note_count'), reverse=True)
return [post['post_url'] for post in post_list]

View File

@ -0,0 +1,41 @@
from dataclasses import dataclass, field
from operator import itemgetter
from typing import Any, Dict, List
from stats_model import StatsModel
@dataclass(kw_only=True)
class BuildTotalStatsModel(StatsModel):
"""Stats model built around calculating stats for posts containing one one or more specified tags."""
operation: str = 'build_total_stats'
# Top 100 posts, ranked from most popular to least popular by notes.
top_100_ranked_post_urls: List[str] = field(default_factory=list)
# Posts ranked from most popular to least popular by notes within each month and year.
top_post_urls_by_month_and_year: Dict[str, List[str]] = field(init=False)
# Tags ranked from most popular to least popular by notes.
most_popular_tags: List[Dict[str, Any]] = field(default_factory=list)
def __post_init__(self):
super().__post_init__()
self.most_popular_tags = self.determine_most_popular_tags()
def determine_most_popular_tags(self) -> List[Dict[str, Any]]:
tag_dict: Dict[str, Any] = {}
for post_key in self.original_post_map:
post = self.original_post_map[post_key]
tags = post['tags']
for tag in tags:
if tag in tag_dict:
tag_dict[tag] = {
'tag': tag, 'note_count': tag_dict[tag] + post['note_count']}
else:
tag_dict[tag] = {'tag': tag,
'note_count': post['note_count']}
tag_list = sorted(list(tag_dict.values()),
key=itemgetter('note_count'), reverse=True)
return tag_list

View File

@ -1,33 +0,0 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List
@dataclass
class StatsModel:
"""Class that models the output of the Tumblr stats script."""
# Operation used to output stats.
operation: str = field(default_factory=str)
# Blog in question.
blog_name: str = field(default_factory=str)
# Tags used.
tags: List[str] = field(default_factory=list)
# Original posts.
post_map: Dict[str, Any] = field(default_factory=dict)
# Posts that are not original.
dumpster: Dict[str, Any] = field(default_factory=dict)
# Total posts handled.
total_posts: int = field(default_factory=int)
# Total original posts (per blog_name) handled.
total_original_posts: int = field(default_factory=int)
# Total original post (per blog_name) notes handled
total_original_post_notes: int = field(default_factory=int)
# Posts ranked from most popular to least popular.
ranked_post_urls: List[str] = field(default_factory=list)

30
pdm.lock generated
View File

@ -5,7 +5,7 @@
groups = ["default"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:0066a6c1eb9f4f3caa8a173b83a51cf5a8e0f5834aa1ef8519c94b115f036049"
content_hash = "sha256:361b3ef81b52c72cd7565a0d9c843587ebc3cf603ea4965d5d4d7f48fea623f6"
[[metadata.targets]]
requires_python = "==3.14.*"
@ -96,34 +96,6 @@ files = [
{file = "PyTumblr-0.1.2.tar.gz", hash = "sha256:eef2653110f84df74a9d8628a4edf30131004de07e93484bc8cc95e44072036e"},
]
[[package]]
name = "pyyaml"
version = "6.0.3"
requires_python = ">=3.8"
summary = "YAML parser and emitter for Python"
groups = ["default"]
files = [
{file = "pyyaml-6.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:8d1fab6bb153a416f9aeb4b8763bc0f22a5586065f86f7664fc23339fc1c1fac"},
{file = "pyyaml-6.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:34d5fcd24b8445fadc33f9cf348c1047101756fd760b4dacb5c3e99755703310"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:501a031947e3a9025ed4405a168e6ef5ae3126c59f90ce0cd6f2bfc477be31b7"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b3bc83488de33889877a0f2543ade9f70c67d66d9ebb4ac959502e12de895788"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c458b6d084f9b935061bc36216e8a69a7e293a2f1e68bf956dcd9e6cbcd143f5"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:7c6610def4f163542a622a73fb39f534f8c101d690126992300bf3207eab9764"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:5190d403f121660ce8d1d2c1bb2ef1bd05b5f68533fc5c2ea899bd15f4399b35"},
{file = "pyyaml-6.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:4a2e8cebe2ff6ab7d1050ecd59c25d4c8bd7e6f400f5f82b96557ac0abafd0ac"},
{file = "pyyaml-6.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:93dda82c9c22deb0a405ea4dc5f2d0cda384168e466364dec6255b293923b2f3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:02893d100e99e03eda1c8fd5c441d8c60103fd175728e23e431db1b589cf5ab3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:c1ff362665ae507275af2853520967820d9124984e0f7466736aea23d8611fba"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6adc77889b628398debc7b65c073bcb99c4a0237b248cacaf3fe8a557563ef6c"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a80cb027f6b349846a3bf6d73b5e95e782175e52f22108cfa17876aaeff93702"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:00c4bdeba853cc34e7dd471f16b4114f4162dc03e6b7afcc2128711f0eca823c"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:66e1674c3ef6f541c35191caae2d429b967b99e02040f5ba928632d9a7f0f065"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:16249ee61e95f858e83976573de0f5b2893b3677ba71c9dd36b9cf8be9ac6d65"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4ad1906908f2f5ae4e5a8ddfce73c320c2a1429ec52eafd27138b7f1cbe341c9"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b"},
{file = "pyyaml-6.0.3.tar.gz", hash = "sha256:d76623373421df22fb4cf8817020cbb7ef15c725b9d5e45f17e189bfc384190f"},
]
[[package]]
name = "requests"
version = "2.32.5"

View File

@ -1,14 +1,13 @@
[project]
name = "ambers-stats"
name = "tumblr_stats"
version = "0.1.0"
description = "Default template for PDM package"
description = "Calculate Tumblr stats using pytumblr."
authors = [
{name = "Amber", email = "amber@ailuridae.io"},
]
dependencies = ["pytumblr>=0.1.2", "pyyaml>=6.0.3"]
dependencies = ["pytumblr==0.1.2"]
requires-python = "==3.14.*"
readme = "README.md"
license = {text = "MIT"}
[tool.pdm]

67
stats_model.py Normal file
View File

@ -0,0 +1,67 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List
@dataclass
class StatsModel:
"""Class that models the output of the Tumblr stats script."""
# The operation that was used to output stats.
operation: str
# The blog in question.
blog_name: str
# Contains original posts, indexed by post ID.
original_post_map: Dict[str, Any]
# Contains posts that are not original, indexed by post ID.
unoriginal_post_map: Dict[str, Any]
# Any tags used.
tags: List[str] = field(default_factory=list)
# Total count of posts processed.
total_posts: int = field(init=False)
# Total original posts (for blog_name) processed.
total_original_posts: int = field(init=False)
# Total original post (for blog_name) notes processed.
total_original_post_notes: int = field(init=False)
# Total notes for original posts within each month and year.
total_original_post_notes_by_month_and_year: Dict[str, int] = field(
init=False)
def __post_init__(self):
self.total_posts = self.calculate_total_posts()
self.total_original_posts = self.calculate_total_original_posts()
self.total_original_post_notes = self.calculate_total_original_post_notes()
self.total_original_post_notes_by_month_and_year = self.calculate_total_original_post_notes_by_month_and_year()
def calculate_total_posts(self) -> int:
return len(self.original_post_map) + len(self.unoriginal_post_map)
def calculate_total_original_posts(self) -> int:
return len(self.original_post_map)
def calculate_total_original_post_notes(self) -> int:
total = 0
for post_key in self.original_post_map:
total += self.original_post_map[post_key]['note_count']
return total
def calculate_total_original_post_notes_by_month_and_year(self) -> Dict[str, int]:
date_map: Dict[str, int] = {}
for post_key in self.original_post_map:
post = self.original_post_map[post_key]
# Format is like 2025-12-28 20:00:34 GMT
post_date: datetime = datetime.strptime(
post['date'], '%Y-%m-%d %H:%M:%S %Z')
post_date_key = f"{post_date.year}-{post_date.month:02}"
if post_date_key in date_map:
date_map[post_date_key] += post['note_count']
else:
date_map[post_date_key] = post['note_count']
return date_map

187
tumblr_stats.py Normal file
View File

@ -0,0 +1,187 @@
import argparse
import csv
from dataclasses import asdict
from datetime import datetime
import json
import os
import sys
from typing import Any, Dict, List, Tuple
import pytumblr
from build_tag_stats_model import BuildTagStatsModel
from build_total_stats_model import BuildTotalStatsModel
from stats_model import StatsModel
def get_args() -> Dict[str, Any]:
"""Pull arguments from command line, turn them into a dictionary of <arg, value>"""
parser = argparse.ArgumentParser(
prog='tumblr_stats.py',
description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, metavar='OPERATION', choices=['build_tag_stats'],
help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats')
parser.add_argument('-t', '--tags', type=str, nargs='+',
help='tag(s) to focus on in status (if applicable)')
# parser.add_argument('--before', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts before YYYY-MM-DD')
# parser.add_argument('--after', type=lambda s: datetime.strptime(s, '%Y-%m-%d'),
# help='only gather posts after YYYY-MM-DD')
return vars(parser.parse_args())
def init_client() -> pytumblr.TumblrRestClient:
consumer_key = os.getenv('TUMBLR_CONSUMER_KEY')
consumer_secret = os.getenv('TUMBLR_CONSUMER_SECRET')
oauth_token = os.getenv('TUMBLR_OAUTH_TOKEN')
oauth_secret = os.getenv('TUMBLR_OAUTH_SECRET')
missing_vars = [name for name,
val in [('$TUMBLR_CONSUMER_KEY', consumer_key),
('$TUMBLR_CONSUMER_SECRET', consumer_secret),
('$TUMBLR_OAUTH_TOKEN', oauth_token),
('$TUMBLR_OAUTH_SECRET', oauth_secret)] if val is None]
if missing_vars:
print("Missing important environment variables:", missing_vars)
sys.exit(1)
return pytumblr.TumblrRestClient(
consumer_key=consumer_key, # type: ignore
consumer_secret=consumer_secret, # type: ignore
oauth_token=oauth_token, # type: ignore
oauth_secret=oauth_secret, # type: ignore
)
def build_post_map_and_dumpster(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
post_map: Dict[str, Any] = {}
dumpster: Dict[str, Any] = {}
blog_name = args['blog']
params = {}
if args['tags']:
params.update({'tag': ','.join(args['tags'])})
# if args['before']:
# before: datetime = args['before']
# params.update({'before': int(before.timestamp())})
# if args['after']:
# after: datetime = args['after']
# params.update({'after': str(int(after.timestamp()))})
total = 0
offset = 0
limit = 20
while offset <= total:
# Begin LOOP
# Get me some posts! 😈🍪🍪🍪
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
# Sh**t it in the head if we found no posts.
if not data['posts']:
print('Stopping, as no posts were found.')
break
# Total check for the first good iteration, but always checked for sanity.
if total == 0:
# Let's see what's in there,
total_posts = data['total_posts']
# Something was there, so we're good.
print(f"I'm working with {total_posts} total posts...")
total = total_posts
curr_posts = data['posts']
local_post_map: Dict[str, Any] = {}
for curr_post in curr_posts:
curr_key = curr_post['id_string']
if curr_key not in local_post_map:
local_post_map[curr_key] = curr_post
local_dumpster = {}
filtered_local_post_map = {}
for local_key in local_post_map:
local_post = local_post_map[local_key]
if 'parent_post_url' not in local_post:
filtered_local_post_map[local_key] = local_post
else:
local_dumpster[local_key] = local_post
# The sacred should we add, and if we should, DO ADD, if statement.
has_og_posts = any(post not in post_map for post in filtered_local_post_map)
has_not_og_posts = any(post not in dumpster for post in local_dumpster)
if has_og_posts:
post_map.update(filtered_local_post_map)
if has_not_og_posts:
dumpster.update(local_dumpster)
# The increment and status printing. Should always end the loop!
offset += limit
if offset == limit:
print('Processed first batch...')
elif offset < total:
print(f"Processed batch {offset // limit} of {total // 20}...")
else:
print(f"Processed all {total} posts")
# End LOOP
return (post_map, dumpster)
def build_tag_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTagStatsModel:
post_map, dumpster = build_post_map_and_dumpster(client, args)
stats_model: BuildTagStatsModel = BuildTagStatsModel(blog_name=args['blog'], original_post_map=post_map,
unoriginal_post_map=dumpster)
stats_model.tags = args['tags']
return stats_model
def build_total_stats_model(client: pytumblr.TumblrRestClient, args: Dict[str, Any]) -> BuildTotalStatsModel:
post_map, dumpster = build_post_map_and_dumpster(client, args)
stats_model: BuildTotalStatsModel = BuildTotalStatsModel(blog_name=args['blog'], original_post_map=post_map,
unoriginal_post_map=dumpster)
return stats_model
def main() -> None:
args = get_args()
client = init_client()
stats_model = StatsModel(blog_name=args['blog'], operation='undefined',
original_post_map={}, unoriginal_post_map={})
if args['operation'] == 'build_tag_stats':
stats_model = build_tag_stats_model(client, args)
elif args['operation'] == 'build_total_stats':
if 'before' not in args: # or 'after' not in args:
print('You must specify a time range for build_total stats. ' +
'You\'ll otherwise request TOO MUCH DATA!')
sys.exit()
stats_model = build_total_stats_model(client, args)
with open('./tumblr_stats.json', 'w') as f:
json.dump(asdict(stats_model), f, indent=2, sort_keys=True)
if stats_model.original_post_map:
with open('./tumblr_original_posts.csv', 'w', newline='') as f:
post_list: List[Dict[str, Any]] = list(
stats_model.original_post_map.values())
wr = csv.DictWriter(f, quoting=csv.QUOTE_ALL, extrasaction='ignore',
fieldnames=post_list[0].keys())
wr.writeheader()
wr.writerows(post_list)
else:
print('No original posts were found, so no CSV of original posts was written.')
return
if __name__ == '__main__':
main()
sys.exit(0)