Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 107 additions & 15 deletions src/ingestion/newsapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import requests
from dotenv import load_dotenv

from src.utils.dbconnector import find_one_document, insert_document
from src.utils.dbconnector import find_one_document, insert_document, update_document
from src.utils.logger import setup_logger

# Load API key from .env file
Expand All @@ -17,33 +17,63 @@
logger = setup_logger()


def fetch_news(query, from_date: datetime, sort_by, limit, to_json):
def fetch_news(query, from_date, sort_by="popularity", limit=10, to_json=False, force_fetch=False):
"""
Fetches news articles from NewsAPI for the given query, from date and sort_by.

Args:
query (str): The query to search for in the NewsAPI.
from_date (datetime.datetime): The date from which to fetch the articles.
from_date (str or datetime): The date from which to fetch the articles.
sort_by (str): The field to sort the results by.
limit (int): The number of articles to fetch.
to_json (bool): Whether to store the results in a JSON file.
force_fetch (bool): Whether to bypass the cache and fetch new results.

Returns:
List[str]: The IDs of the articles that were fetched and stored in MongoDB.
"""
url = f"https://newsapi.org/v2/everything?q={query}&from={from_date}&sortBy={sort_by}&apiKey={API_KEY}"
# Check for cached results first
try:
logger.debug("Requesting data from NewsAPI")
previous = find_one_document("News_Articles_Ids", {"query": query})
if previous:
existing_ids = []

if previous and not force_fetch:
logger.info(f"Previous data found for {query} from {from_date}")
return previous["ids"]
response = requests.get(url)
elif previous and force_fetch:
# Keep track of existing IDs to avoid duplicates
existing_ids = previous["ids"]
logger.info(f"Found {len(existing_ids)} existing articles for {query}. Will fetch more up to {limit} total.")

# Set up the API request
base_url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"from": from_date,
"sortBy": sort_by,
"pageSize": limit
}

headers = {
"X-Api-Key": API_KEY,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/json"
}

# Make the request
response = requests.get(base_url, params=params, headers=headers)
response.raise_for_status() # Raise an error for bad status codes
data = response.json()

if data.get("status") == "ok":
logger.info(f"Total results: {data.get('totalResults')}")
logger.info(f"Total results from NewsAPI: {data.get('totalResults')}")

# Log the titles of returned articles for debugging
logger.info("Articles returned from NewsAPI:")
for i, article in enumerate(data.get("articles", [])[:limit]):
logger.info(f" {i+1}. {article.get('title')}")

if to_json:
try:
# store the data in json
Expand All @@ -56,35 +86,97 @@ def fetch_news(query, from_date: datetime, sort_by, limit, to_json):
except Exception as e:
logger.error(
f"Error occurred while storing results: {str(e)}")
return []
else:
# Initialize lists for article objects and IDs
articles_db = []
article_ids = []
for article in data.get("articles", [])[:limit]:
logger.debug(
f"Adding ids to articles and saving them to MongoDB")

# If we have existing IDs and are forcing a fetch, start with those
if existing_ids and force_fetch:
article_ids = existing_ids.copy()

# Set of URLs and titles already in our database to avoid duplicates
existing_urls = set()
existing_titles = set()
if article_ids:
# Get URLs and titles of existing articles
for aid in article_ids:
article = find_one_document("News_Articles", {"id": aid})
if article:
if "url" in article:
existing_urls.add(article["url"])
if "title" in article:
existing_titles.add(article["title"])

duplicates_by_url = 0
duplicates_by_title = 0

# Process articles from the API response
for article in data.get("articles", []):
# Skip if we've reached the limit
if len(article_ids) >= limit:
logger.info(f"Reached limit of {limit} articles. Stopping.")
break

title = article.get("title")
url = article.get("url")

# Skip duplicates based on URL
if url in existing_urls:
duplicates_by_url += 1
logger.debug(f"Skipping duplicate article (by URL): {title}")
continue

# Skip duplicates based on title (if title is not None)
if title and title in existing_titles:
duplicates_by_title += 1
logger.debug(f"Skipping duplicate article (by title): {title}")
continue

# Generate ID and add to database
id = str(uuid.uuid4())
article_ids.append(id)

# Update sets of existing URLs and titles
if url:
existing_urls.add(url)
if title:
existing_titles.add(title)

article_obj = {
"id": id,
"title": article.get("title"),
"title": title,
"description": article.get("description"),
"url": article.get("url"),
"url": url,
"urltoimage": article.get("urlToImage"),
"publishedat": article.get("publishedAt"),
"source": article.get("source").get("name"),
}
insert_document("News_Articles", article_obj)
articles_db.append(article_obj)

logger.info(f"Total articles saved: {len(articles_db)}")
if duplicates_by_url > 0 or duplicates_by_title > 0:
logger.info(f"Duplicates skipped: {duplicates_by_url} by URL, {duplicates_by_title} by title")
logger.debug(f"Article IDs: {article_ids}")
insert_document(
"News_Articles_Ids", {"query": query, "ids": article_ids}
)

# Update or insert the document in News_Articles_Ids
if previous:
# Update document logic
update_document("News_Articles_Ids", {"query": query}, {"ids": article_ids})
logger.info(f"Updated News_Articles_Ids for query '{query}' with {len(article_ids)} articles")
else:
insert_document("News_Articles_Ids", {"query": query, "ids": article_ids})
logger.info(f"Inserted new entry in News_Articles_Ids for query '{query}' with {len(article_ids)} articles")

return article_ids
else:
logger.error(f"Error in response: {data}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"HTTP Request failed: {type(e).__name__} - {str(e)}")
return [] # Return empty list to prevent errors


if __name__ == "__main__":
Expand Down
75 changes: 19 additions & 56 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,26 @@ async def process_single_article_async(article_id, session):
return article_id


async def process_articles_async(query, limit=10):
async def process_articles_async(query, limit=10, force_fetch=False):
"""
Process a list of articles asynchronously, by fetching content, summarizing, extracting keywords and analyzing sentiment.

Args:
query (str): The query to search for in the NewsAPI.
limit (int, optional): The number of articles to fetch. Defaults to 10.
force_fetch (bool, optional): Whether to bypass the cache and fetch new results. Defaults to False.

Returns:
List[str]: The IDs of the articles that were processed.
"""
logger.info("Starting the processing of articles.")
article_ids = fetch_news(
query=query,
from_date="2024-08-16",
from_date="2025-04-16",
sort_by="popularity",
limit=limit,
to_json=False,
force_fetch=force_fetch
)
if not isinstance(article_ids, list):
raise ValueError("article_ids should be a list")
Expand All @@ -155,73 +157,34 @@ async def process_articles_async(query, limit=10):
return article_ids


def process_articles(query, limit=10):
def process_articles(query, limit=10, force_fetch=False):
"""
Process a list of articles by fetching content, summarizing, extracting keywords and analyzing sentiment.

Args:
query (str): The query to search for in the NewsAPI.
limit (int, optional): The number of articles to fetch. Defaults to 10.
force_fetch (bool, optional): Whether to bypass the cache and fetch new results. Defaults to False.

Returns:
List[str]: The IDs of the articles that were processed.
"""
logger.info("Starting the processing of articles.")
article_ids = asyncio.run(process_articles_async(query, limit))
article_ids = asyncio.run(process_articles_async(query, limit, force_fetch))
return article_ids
# Fetch articles from NewsAPI
article_ids = fetch_news(
query=query,
from_date="2024-08-04",
sort_by="popularity",
limit=limit,
to_json=False,
)
if not isinstance(article_ids, list):
raise ValueError("article_ids should be a list")

# Get contents for each article
article_contents = fetch_article_content(article_ids)

# contents_file = f"{query.replace(' ', '_')}_contents2.json"
# with open(contents_file, "w", encoding="utf-8") as f:
# json.dump(article_contents, f, ensure_ascii=False, indent=4)

# Summarize the articles
logger.info("Summarizing articles.")
article_summaries = summarize_texts(article_ids)

# summaries_file = f"{query.replace(' ', '_')}_summaries2.json"
# with open(summaries_file, "w", encoding="utf-8") as f:
# json.dump(article_summaries, f, ensure_ascii=False, indent=4)

# Extract keywords from summaries
logger.info("Extracting keywords from summaries.")
article_keywords = extract_keywords(article_ids, top_n=10)

# keywords_file = f"{query.replace(' ', '_')}_keywords2.json"
# with open(keywords_file, "w", encoding="utf-8") as f:
# json.dump(article_keywords, f, ensure_ascii=False, indent=4)

# Analyze sentiments of summaries
logger.info("Analyzing sentiments of summaries.")
article_sentiments = analyze_sentiments(article_ids)


if __name__ == "__main__":
logger.info("Starting the processing of articles.")

article_ids = process_articles("Adani Hindenburg Report", limit=10)
logger.info(f"Article IDs: {article_ids}")

# news_data = fetch_news(
# query="Kolkata Murder Case", from_date="2024-08-01", sort_by="popularity", to_json=False
# )
# urls = [article.get("url") for article in news_data.get("articles", [])]

# # Process articles
# keywords, sentiments, wordcloud = process_articles(urls)
# logger.info(f"Keywords: {keywords}")
# logger.info(f"Sentiments: {sentiments}")
# wordcloud.to_image().save("wordcloud.png")
# logger.info("Processing of articles completed successfully.")

try:
# Process articles about India retaliation attacks with force_fetch=True to bypass cache
search_query = "India retaliation attack"
logger.info(f"Fetching news articles about: {search_query}")
article_ids = process_articles(search_query, limit=100, force_fetch=True)
logger.info(f"Successfully processed {len(article_ids)} articles about '{search_query}'")
logger.info(f"Article IDs: {article_ids}")
except Exception as e:
logger.error(f"Error processing articles: {e}")
import traceback
traceback.print_exc()
Loading