Added operation for calculating stats on queued posts, improved loop handling

This commit is contained in:
2026-01-03 23:08:01 -05:00
parent 590277d7ee
commit d4e6df7721
3 changed files with 86 additions and 16 deletions

View File

@@ -13,6 +13,7 @@ import pytumblr
from build_tag_stats_model import BuildTagStatsModel
from build_total_stats_model import BuildTotalStatsModel
from build_queue_stats_model import BuildQueueStatsModel
from stats_model import StatsModel
@@ -23,8 +24,8 @@ def get_args() -> Dict[str, Any]:
description='Use pytumblr to calculate stats after setting these enviroment variables: '
+ '$TUMBLR_CONSUMER_KEY, $TUMBLR_CONSUMER_SECRET, $TUMBLR_OAUTH_TOKEN, and $TUMBLR_OAUTH_SECRET',
epilog='— Be gay and do crime')
parser.add_argument('operation', type=str, nargs='+',
metavar='OPERATION', choices=['build_tag_stats'],
parser.add_argument('operation', type=str, nargs='+', metavar='OPERATION',
choices=['build_tag_stats', 'build_queue_stats'],
help="operation used to calculate stats")
parser.add_argument('-b', '--blog', type=str, required=True,
help='blog name for which to calculate stats')
@@ -96,23 +97,35 @@ def build_post_maps(client: pytumblr.TumblrRestClient,
limit: int = 20
# The request loop that pulls all data from the APIs.
while offset <= total:
while True:
# Begin LOOP
# Get me some posts via REST! 😈🍪🍪🍪
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
data: Dict[str, Any]
if 'build_queue_stats' in args['operation'] and len(args['operation']) == 1:
data = client.queue(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
else: # Above is for queued posts, below is for published posts.
data = client.posts(f"{blog_name}.tumblr.com",
offset=offset,
limit=limit,
**params)
# Stop the presses if we found no posts.
curr_posts: List[Dict[str, Any]] = data['posts']
if not curr_posts or len(curr_posts) < 1:
print('Stopping, as no posts were found.')
print('Stopping, as no more posts were found.')
break
next_off: int = 0
if '_links' in data:
links = data['_links']
if 'next' in links and 'query_params' in links['next']:
next_off = int(links['next']['query_params']['offset'])
# Total init check for the first iteration, but always checked for sanity.
if not total:
# Let's see what's in there,
if not total and 'total_posts' in data:
total_posts = data['total_posts']
print(f"I'm working with {total_posts} total posts...")
total = total_posts
@@ -137,9 +150,12 @@ def build_post_maps(client: pytumblr.TumblrRestClient,
og_post_map.update(local_og_post_map)
un_og_post_map.update(local_un_og_post_map)
# The increment and status printing. Should always end the loop!
offset += limit
if not args['after']:
# The increment and status printing.
if next_off != 0 and next_off != offset:
offset = next_off
else:
offset += limit
if not args['after'] and total:
print(
f"Processed batch {offset // limit} of {(total // 20) + 1}...")
# End LOOP
@@ -177,6 +193,13 @@ def main() -> None:
# Pick a stats model, which will determine output.
stats_model: StatsModel
if 'build_queue_stats' in args['operation']:
if len(args['operation']) != 1:
print('You can\'t mix build_queue_stats with other operations. Sorry.')
sys.exit(1)
stats_model = BuildQueueStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,
unoriginal_post_map=un_og_post_map)
if 'build_tag_stats' in args['operation']:
stats_model = BuildTagStatsModel(blog_name=args['blog'],
original_post_map=og_post_map,