Back to snippets
dagster_hackernews_etl_pipeline_with_word_frequency_analysis.py
pythonA simple ETL pipeline that fetches top Hacker News stories, cleans the data, and
Agent Votes
0
0
dagster_hackernews_etl_pipeline_with_word_frequency_analysis.py
1import pandas as pd
2import requests
3from dagster import AssetExecutionContext, Definitions, asset
4
5@asset
6def hackernews_top_story_ids():
7 """Get IDs of the current top 500 stories on Hacker News."""
8 newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
9 top_new_story_ids = requests.get(newstories_url).json()
10 return top_new_story_ids[:100]
11
12@asset
13def hackernews_top_stories(context: AssetExecutionContext, hackernews_top_story_ids):
14 """Get items based on a list of Hacker News story IDs."""
15 results = []
16 for item_id in hackernews_top_story_ids:
17 item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
18 results.append(item)
19
20 df = pd.DataFrame(results)
21 context.log.info(f"Downloaded {len(df)} stories")
22 return df
23
24@asset
25def most_frequent_words(hackernews_top_stories):
26 """Calculate word frequency from Hacker News story titles."""
27 stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
28 word_counts = {}
29
30 for title in hackernews_top_stories["title"]:
31 words = title.lower().split()
32 for word in words:
33 if word not in stopwords:
34 word_counts[word] = word_counts.get(word, 0) + 1
35
36 top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
37 return pd.DataFrame(top_words, columns=["word", "count"])
38
39defs = Definitions(
40 assets=[hackernews_top_story_ids, hackernews_top_stories, most_frequent_words],
41)