diff --git a/.github/workflows/api.yaml b/.github/workflows/api.yaml new file mode 100644 index 0000000..275a0ec --- /dev/null +++ b/.github/workflows/api.yaml @@ -0,0 +1,31 @@ +name: Deploy API Gateway + +on: + push: + paths: + - 'api/*' + +jobs: + deploy-api: + runs-on: ubuntu-latest + + steps: + - name: Checkout repo + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install boto3 + - name: Run API deployment script + env: + AWS_ACCESS_KEY_ID: AKIAT5VEV4FHFQQJBZVX + AWS_SECRET_ACCESS_KEY: o56LCVEDcTPD8RgU2iWtz8SBklKa5DqQ6+nCYawf + AWS_REGION: us-east-1 # or your region + run: | + python3 api/deploy-api.py \ No newline at end of file diff --git a/.github/workflows/lambda.yaml b/.github/workflows/lambda.yaml new file mode 100644 index 0000000..45cd394 --- /dev/null +++ b/.github/workflows/lambda.yaml @@ -0,0 +1,95 @@ +name: Deploy Lambdas + +on: + push: + branches: + - main + - backend_changes + +env: + LAMBDA_DIR: backend + DEPLOY_BUCKET: hackathon-lambda-ap-ai-cyberark + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install AWS CLI + run: pip install awscli + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: AKIAT5VEV4FHFQQJBZVX + aws-secret-access-key: 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 + aws-region: us-east-1 + + - name: Deploy Lambda Functions (No dependencies) + run: | + LAYER_ARN=$(aws lambda list-layer-versions \ + --layer-name "my-python-layer" \ + --query 'LayerVersions[0].LayerVersionArn' \ + --output text) + + echo "Using latest layer version: $LAYER_ARN" + for dir in "$LAMBDA_DIR"/*/; do + dir=${dir%/} + function_name=$(basename "$dir") + entry_point="$dir/${function_name}.py" + + if [ ! -f "$entry_point" ]; then + echo "Skipping $function_name: $entry_point not found." + continue + fi + + echo "Packaging Lambda: $function_name" + + build_dir="/tmp/${function_name}_build" + zip_file="/tmp/${function_name}.zip" + + rm -rf "$build_dir" + mkdir -p "$build_dir" + + # Copy Lambda source file + cp -r "$dir"/* "$build_dir/" + + # Copy all top-level shared files from backend (excluding directories and requirements.txt) + find "$LAMBDA_DIR" -maxdepth 1 -type f ! -name "requirements.txt" -exec cp {} "$build_dir/" \; + + # Zip build directory + cd "$build_dir" + zip -r "$zip_file" . > /dev/null + cd - + + # Upload zip to S3 + aws s3 cp "$zip_file" "s3://${DEPLOY_BUCKET}/${function_name}.zip" + + # Deploy Lambda + if aws lambda get-function --function-name "$function_name" > /dev/null 2>&1; then + echo "Updating Lambda: $function_name" + aws lambda update-function-code \ + --function-name "$function_name" \ + --s3-bucket "$DEPLOY_BUCKET" \ + --s3-key "${function_name}.zip" + else + echo "Creating Lambda: $function_name" + aws lambda create-function \ + --function-name "$function_name" \ + --runtime python3.12 \ + --role "arn:aws:iam::269854564686:role/hackathon-lambda-role" \ + --handler "${function_name}.lambda_handler" \ + --code S3Bucket="$DEPLOY_BUCKET",S3Key="${function_name}.zip" \ + --timeout 900 \ + --vpc-config SubnetIds=subnet-02e62e34308bb07d5,subnet-0534b99dd34e646f1,SecurityGroupIds=sg-0b9a6b812b30a1107 \ + --layers "$LAYER_ARN" + fi + done diff --git a/.github/workflows/layer.yaml b/.github/workflows/layer.yaml new file mode 100644 index 0000000..299a7f9 --- /dev/null +++ b/.github/workflows/layer.yaml @@ -0,0 +1,88 @@ +name: Build Lambda Layer + +on: + push: + paths: + - 'backend/requirements.txt' + workflow_dispatch: + +jobs: + build-and-publish-layer: + runs-on: ubuntu-latest + + env: + LAYER_NAME: my-python-layer + PYTHON_VERSION: python3.12 + S3_KEY: layers/layer.zip + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies to python/ + run: | + mkdir -p python + pip install -r backend/requirements.txt --platform manylinux2014_x86_64 --only-binary=:all: -t python/ + + - name: Clean up unnecessary files + run: | + find python/ -type d -name "__pycache__" -exec rm -rf {} + + find python/ -type d -name "tests" -exec rm -rf {} + + find python/ -type f -name "*.pyc" -delete + + - name: Zip the layer + run: zip -r layer.zip python + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: AKIAT5VEV4FHFQQJBZVX + aws-secret-access-key: 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 + aws-region: us-east-1 + + - name: Upload zip to S3 + run: | + aws s3 cp layer.zip s3://hackathon-lambda-ap-ai-cyberark/${{ env.S3_KEY }} + + - name: Publish Lambda Layer from S3 + run: | + aws lambda publish-layer-version \ + --layer-name ${{ env.LAYER_NAME }} \ + --description "Dependencies from backend/requirements.txt" \ + --content S3Bucket=hackathon-lambda-ap-ai-cyberark,S3Key=${{ env.S3_KEY }} \ + --compatible-runtimes ${{ env.PYTHON_VERSION }} + + - name: Upload artifact (optional) + uses: actions/upload-artifact@v4 + with: + name: lambda-layer + path: layer.zip + + - name: Get latest layer version ARN + id: get-layer-version + run: | + LAYER_ARN=$(aws lambda list-layer-versions --layer-name ${{ env.LAYER_NAME }} \ + --query 'LayerVersions[0].LayerVersionArn' --output text) + echo "layer_arn=$LAYER_ARN" >> "$GITHUB_OUTPUT" + + - name: List functions using the layer + id: list-functions + run: | + FUNCTIONS=$(aws lambda list-functions --query \ + "Functions[?Layers && contains(join(',', Layers[].Arn), '${{ env.LAYER_NAME }}')].FunctionName" \ + --output text) + echo "functions=$FUNCTIONS" >> "$GITHUB_OUTPUT" + + - name: Update functions to use latest layer version + run: | + for function in ${{ steps.list-functions.outputs.functions }}; do + echo "Updating $function..." + aws lambda update-function-configuration \ + --function-name "$function" \ + --layers ${{ steps.get-layer-version.outputs.layer_arn }} + done diff --git a/.gitignore b/.gitignore index fd3a82d..1b02c6d 100644 --- a/.gitignore +++ b/.gitignore @@ -193,3 +193,6 @@ cython_debug/ .cursorignore .cursorindexingignore .DS_Store + +node_modules +package-lock.json \ No newline at end of file diff --git a/amplify.py b/amplify.py index 03fc414..b48f7dc 100644 --- a/amplify.py +++ b/amplify.py @@ -11,7 +11,6 @@ branch = sys.argv[2] repo_owner, repo_name = repo_full.split('/') app_name = f"amplify-{repo_name}" - client = boto3.client('amplify','us-east-1') print(GITHUB_PAT) diff --git a/api/deploy-api.py b/api/deploy-api.py new file mode 100644 index 0000000..5f0acc3 --- /dev/null +++ b/api/deploy-api.py @@ -0,0 +1,144 @@ +import boto3 +import json +import os + +REGION = "us-east-1" +STAGE = "prod" +ACCOUNT_ID = "269854564686" + +apigateway = boto3.client("apigateway", region_name=REGION) +lambda_client = boto3.client("lambda", region_name=REGION) + +def get_or_create_api(api_name): + apis = apigateway.get_rest_apis()["items"] + for api in apis: + if api["name"] == api_name: + print(f"Found API: {api_name}") + return api["id"] + + print(f"Creating API: {api_name}") + response = apigateway.create_rest_api(name=api_name) + return response["id"] + +def get_or_create_resource(api_id, full_path): + # Normalize and split nested path: "users/{userId}" => ["users", "{userId}"] + parts = [p for p in full_path.strip("/").split("/") if p] + resources = apigateway.get_resources(restApiId=api_id)["items"] + + # Build a path-to-id map + path_map = {res["path"]: res["id"] for res in resources} + parent_path = "" + parent_id = path_map["/"] # root path + + for part in parts: + current_path = f"{parent_path}/{part}" if parent_path else f"/{part}" + if current_path in path_map: + parent_id = path_map[current_path] + else: + print(f"Creating resource: {current_path}") + response = apigateway.create_resource( + restApiId=api_id, + parentId=parent_id, + pathPart=part + ) + parent_id = response["id"] + path_map[current_path] = parent_id + parent_path = current_path + + return parent_id + + +def method_exists(api_id, resource_id, http_method): + try: + apigateway.get_method( + restApiId=api_id, + resourceId=resource_id, + httpMethod=http_method + ) + return True + except apigateway.exceptions.NotFoundException: + return False + +def add_lambda_permission(lambda_name, api_id, method, path): + statement_id = f"{lambda_name.lower()}-{method.lower()}" + try: + lambda_client.add_permission( + FunctionName=lambda_name, + StatementId=statement_id, + Action="lambda:InvokeFunction", + Principal="apigateway.amazonaws.com", + SourceArn=f"arn:aws:execute-api:{REGION}:{ACCOUNT_ID}:{api_id}/*/{method}/{path}" + ) + print(f"Added permission to Lambda {lambda_name} for method {method} /{path}") + except lambda_client.exceptions.ResourceConflictException: + # Permission already exists + print(f"Permission already exists for Lambda {lambda_name} and method {method} /{path}") + +def setup_method(api_id, resource_id, method_def, path): + method = method_def["httpMethod"].upper() + lambda_name = method_def["lambdaFunctionName"] + auth_type = method_def.get("authorizationType", "NONE") + lambda_arn = f"arn:aws:lambda:{REGION}:{ACCOUNT_ID}:function:{lambda_name}" + + if method_exists(api_id, resource_id, method): + print(f"Method {method} already exists for /{path}, skipping method creation.") + else: + print(f"Creating method {method} for /{path}") + apigateway.put_method( + restApiId=api_id, + resourceId=resource_id, + httpMethod=method, + authorizationType=auth_type + ) + + print(f"Setting integration for {method} /{path}") + apigateway.put_integration( + restApiId=api_id, + resourceId=resource_id, + httpMethod=method, + type="AWS_PROXY", + integrationHttpMethod="POST", + uri=f"arn:aws:apigateway:{REGION}:lambda:path/2015-03-31/functions/{lambda_arn}/invocations" + ) + + add_lambda_permission(lambda_name, api_id, method, path) + +def deploy_api(api_id): + print(f"Deploying API {api_id} to stage: {STAGE}") + apigateway.create_deployment( + restApiId=api_id, + stageName=STAGE + ) + +def main(): + # Use script folder as working directory to find JSON files + script_dir = os.path.dirname(os.path.abspath(__file__)) + + deploy_apis = set() + + # Loop all JSON files in current folder + for file in os.listdir(script_dir): + if not file.endswith(".json"): + continue + + json_path = os.path.join(script_dir, file) + with open(json_path) as f: + config = json.load(f) + + api_name = config["apiName"] + resource_path = config["resourcePath"] + method_def = config["method"] + should_deploy = config.get("deploy", False) + + api_id = get_or_create_api(api_name) + resource_id = get_or_create_resource(api_id, resource_path) + setup_method(api_id, resource_id, method_def, resource_path) + + if should_deploy: + deploy_apis.add(api_id) + + for api_id in deploy_apis: + deploy_api(api_id) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/api/get-value.json b/api/get-value.json new file mode 100644 index 0000000..04ffc52 --- /dev/null +++ b/api/get-value.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "get-value", + "method": { + "httpMethod": "GET", + "authorizationType": "NONE", + "lambdaFunctionName": "Submit" + }, + "deploy": true +} \ No newline at end of file diff --git a/api/get_feed.json b/api/get_feed.json new file mode 100644 index 0000000..52f75ce --- /dev/null +++ b/api/get_feed.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "getfeed", + "method": { + "httpMethod": "GET", + "authorizationType": "NONE", + "lambdaFunctionName": "get_feed" + }, + "deploy": true +} \ No newline at end of file diff --git a/api/query-agent.json b/api/query-agent.json new file mode 100644 index 0000000..8dd8ff2 --- /dev/null +++ b/api/query-agent.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "queryagent", + "method": { + "httpMethod": "POST", + "authorizationType": "NONE", + "lambdaFunctionName": "web_api" + }, + "deploy": true +} \ No newline at end of file diff --git a/api/query-test.json b/api/query-test.json new file mode 100644 index 0000000..22c5660 --- /dev/null +++ b/api/query-test.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "test/query", + "method": { + "httpMethod": "POST", + "authorizationType": "NONE", + "lambdaFunctionName": "web_api" + }, + "deploy": true +} \ No newline at end of file diff --git a/api/upload.json b/api/upload.json new file mode 100644 index 0000000..c67b2cb --- /dev/null +++ b/api/upload.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "upload", + "method": { + "httpMethod": "POST", + "authorizationType": "NONE", + "lambdaFunctionName": "Submit" + }, + "deploy": true +} \ No newline at end of file diff --git a/api/web-api.json b/api/web-api.json new file mode 100644 index 0000000..da394f3 --- /dev/null +++ b/api/web-api.json @@ -0,0 +1,10 @@ +{ + "apiName": "hackathon", + "resourcePath": "articles", + "method": { + "httpMethod": "GET", + "authorizationType": "NONE", + "lambdaFunctionName": "web_api" + }, + "deploy": true +} \ No newline at end of file diff --git a/backend/Backup/Submit.py b/backend/Backup/Submit.py deleted file mode 100644 index 2620638..0000000 --- a/backend/Backup/Submit.py +++ /dev/null @@ -1,92 +0,0 @@ -import io -import pandas as pd -import boto3 -import time -import uuid -from Utils import get_postgresql_connection - -comprehend = boto3.client('comprehend') - -input_s3_uri = 's3://awstraindata/input.csv' -role_arn = 'arn:aws:iam::269854564686:role/hackathon-comprehend-role' - -s3 = boto3.client('s3') -# Download the file object -input_csv_object = s3.get_object(Bucket='awstraindata', Key='input.csv') - -# Read CSV into DataFrame -conn = get_postgresql_connection() -cursor = conn.cursor() -cursor.execute("drop table if exists articles") -cursor.execute("""CREATE TABLE IF NOT EXISTS articles ( - articles_id TEXT, - title TEXT, - body TEXT, - source TEXT, - published_date TEXT, - location_mentions TEXT, - officials_involved TEXT, - relevance_category TEXT, - sentiment TEXT - )""") -input_csv = pd.read_csv(io.BytesIO(input_csv_object['Body'].read())) -for index, row in input_csv.iterrows(): - print(f"Processing row {index}: {row}") - cursor.execute(""" - INSERT INTO articles (articles_id, title, body, source, published_date) - VALUES (%s, %s, %s, %s, %s)""", (row[0], row[1], row[2], row[3], row[4])) -conn.commit() -cursor.close() -entities_job = comprehend.start_entities_detection_job( - InputDataConfig={'S3Uri': input_s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, - OutputDataConfig={'S3Uri': 's3://awstraindata/output/entities/'}, - DataAccessRoleArn=role_arn, - LanguageCode='en', - JobName='MyEntityDetectionJob_' + str(int(time.time())), -) -result = comprehend.describe_entities_detection_job(JobId=entities_job['JobId']) -entities_output = result['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri'] - -# SENTIMENT detection job -sentiment_job = comprehend.start_sentiment_detection_job( - InputDataConfig={'S3Uri': input_s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, - OutputDataConfig={'S3Uri': 's3://awstraindata/output/sentiment/'}, - DataAccessRoleArn=role_arn, - LanguageCode='en', - JobName='MySentimentDetectionJob_' + str(int(time.time())), -) -res = comprehend.describe_sentiment_detection_job(JobId=sentiment_job['JobId']) -sentiment_output = res['SentimentDetectionJobProperties']['OutputDataConfig']['S3Uri'] - -# KEY PHRASES detection job -phrases_job = comprehend.start_key_phrases_detection_job( - InputDataConfig={'S3Uri': input_s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, - OutputDataConfig={'S3Uri': 's3://awstraindata/output/keyphrases/'}, - DataAccessRoleArn=role_arn, - LanguageCode='en', - JobName='MyKeyPhrasesDetectionJob_' + str(int(time.time())), -) -res = comprehend.describe_key_phrases_detection_job(JobId=phrases_job['JobId']) -key_phrases_output = res['KeyPhrasesDetectionJobProperties']['OutputDataConfig']['S3Uri'] -print("Entities Job Response:", entities_job) -print("Sentiment Job Response:", sentiment_job) -print("Key Phrases Job Response:", phrases_job) -conn = get_postgresql_connection() -if conn: - cursor = conn.cursor() - cursor.execute(""" - CREATE TABLE IF NOT EXISTS comprehend_jobs ( - batch_id TEXT, - input_s3_uri TEXT, - entities_job JSONB, - sentiment_job JSONB, - key_phrases_job JSONB - ) - """) - cursor.execute(""" - INSERT INTO comprehend_jobs (batch_id, input_s3_uri, entities_path, sentiment_path, key_phrases_path) - VALUES (%s, %s, %s, %s, %s)""", (str(uuid.uuid4()), input_s3_uri, entities_output.replace('s3://awstraindata/', ''), sentiment_output.replace('s3://awstraindata/', ''), key_phrases_output.replace('s3://awstraindata/', ''))) - conn.commit() - cursor.close() - conn.close() - diff --git a/backend/Backup/Transform.py b/backend/Backup/Transform.py deleted file mode 100644 index 2db246e..0000000 --- a/backend/Backup/Transform.py +++ /dev/null @@ -1,64 +0,0 @@ -from turtle import pd -import boto3 -import tarfile -import json -import psycopg2 -import io - -from EntityRecog.Utils import get_postgresql_connection -def lambda_handler(event, context): - for record in event['Records']: - print(f"New record: {record}") - bucket = record['s3']['bucket']['name'] - key = record['s3']['object']['key'] - conn = get_postgresql_connection() - s3 = boto3.client('s3') - obj = s3.get_object(Bucket=bucket, Key=key) - tar_bytes = io.BytesIO(obj['Body'].read()) - - # Extract .json inside the tar.gz - with tarfile.open(fileobj=tar_bytes, mode='r:gz') as tar: - for member in tar.getmembers(): - if member.name == "output" and member.isfile(): - file = tar.extractfile(member) - results = json.load(file) - print(f"Extracted JSON: {results}") - break - - if not results: - folderSplit = key.split('/') - type = folderSplit[0] - cursor = conn.cursor() - query = "SELECT * FROM comprehend_jobs WHERE entities_path = %s or sentiment_path = %s or key_phrases_path = %s" - cursor.execute(query, (key, key, key)) - row = cursor.fetchone() - if row: - # Download the file object - response = s3.get_object(Bucket=bucket, Key=row['input_s3_uri']) - - # Read CSV into DataFrame - input_csv = pd.read_csv(io.BytesIO(response['Body'].read())) - for row in results: - if type == 'entities': - location_mentions = ', '.join([entity['Text'] for entity in row['Entities'] if entity['Type'] == 'LOCATION']) - officials_involved = ', '.join([entity['Text'] for entity in row['Entities'] if entity['Type'] == 'PERSON']) - relevance_category = ', '.join([entity['Text'] for entity in row['Entities'] if entity['Type'] == 'TITLE']) - if not location_mentions: - cursor.execute("""update articles set location_mentions = %s where articles_id = %s""", (location_mentions, input_article['articles_id'])) - if not officials_involved: - cursor.execute("""update articles set officials_involved = %s where articles_id = %s""", (officials_involved, input_article['articles_id'])) - if not relevance_category: - cursor.execute("""update articles set relevance_category = %s where articles_id = %s""", (relevance_category, input_article['articles_id'])) - elif type == 'sentiment': - sentiment = row.get('Sentiment', 'NEUTRAL') - if not sentiment: - cursor.execute("""update articles set sentiment = %s where articles_id = %s""", (sentiment, input_article['articles_id'])) - elif type == 'keyphrases': - key_phrases = ', '.join(row.get('KeyPhrases', [])) - if not key_phrases: - cursor.execute("""update articles set key_phrases = %s where articles_id = %s""", (key_phrases, input_article['articles_id'])) - line_number = row['Line'] - input_article = input_csv[line_number] - - cursor.close() - conn.close() \ No newline at end of file diff --git a/backend/JsontoCSV.py b/backend/JsontoCSV.py deleted file mode 100644 index b510dec..0000000 --- a/backend/JsontoCSV.py +++ /dev/null @@ -1,14 +0,0 @@ -import json -import pandas as pd - -# Load the JSON file -with open("input_feed.json", "r", encoding="utf-8") as f: - data = json.load(f) - -# Normalize and convert lists to strings -df = pd.json_normalize(data) -df["extractedLocations"] = df["extractedLocations"].apply(lambda x: ", ".join(x)) -df["tags"] = df["tags"].apply(lambda x: ", ".join(x)) - -# Export to CSV -df.to_csv("news_feed_converted.csv", index=False) \ No newline at end of file diff --git a/backend/Sample.csv b/backend/Sample.csv deleted file mode 100644 index 7a84ed8..0000000 --- a/backend/Sample.csv +++ /dev/null @@ -1,19 +0,0 @@ -Text,Type -"Accident","MISHAP" -"Argument","MISHAP" -"fight","MISHAP" -"quarrel","MISHAP" -"Burn","MISHAP" -"Snatch","MISHAP" -"Murder","CRIME" -"RoadRage","CRIME" -"Rash driving","CRIME" -"Theft","CRIME" -"Burglary","CRIME" -"Cheat","CRIME" -"Stab","CRIME" -"Kill","CRIME" -"Hate Speech","CRIME" -"Hijack","CRIME" -"Beat","CRIME" -"Threat","CRIME" \ No newline at end of file diff --git a/backend/Submit/Submit.py b/backend/Submit/Submit.py new file mode 100644 index 0000000..31d88a0 --- /dev/null +++ b/backend/Submit/Submit.py @@ -0,0 +1,62 @@ +import base64 +import json +from requests_toolbelt.multipart import decoder +import uuid +from Utils import get_postgresql_connection +from fastapi import FastAPI +from docx import Document +import csv +import io +import re +import boto3 +import traceback + +app = FastAPI() + +s3 = boto3.client('s3') +BUCKET_NAME = 'awstraindata' + +def lambda_handler(event, context): + try: + # Decode base64-encoded body (API Gateway encodes binary automatically) + print(f"Received event") + if event.get("isBase64Encoded", False): + body = base64.b64decode(event['body']) + else: + body = event['body'].encode("utf-8") + print(f"Decoded body length: {len(body)} bytes") + # Get content-type header + content_type = event['headers'].get('Content-Type') or event['headers'].get('content-type') + if not content_type: + return {"statusCode": 400, "body": "Missing Content-Type header"} + + # Parse multipart form + multipart_data = decoder.MultipartDecoder(body, content_type) + print(f"Multipart data parts: {len(multipart_data.parts)}") + conn = get_postgresql_connection() + cursor = conn.cursor() + for part in multipart_data.parts: + print(f"Processing part: {part.headers.get(b'Content-Disposition')}") + file_stream = io.BytesIO(part.content) + file_stream.seek(0) + file_id = str(uuid.uuid4()) + s3_key = f"raw_data/{file_id}" + # Upload to S3 + s3.put_object( + Bucket=BUCKET_NAME, + Key=s3_key, + Body=file_stream, + ContentType='application/vnd.openxmlformats-officedocument.wordprocessingml.document' + ) + return { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": json.dumps({ + "status": "success", + }) + } + except Exception as e: + traceback.print_exc() + return {"statusCode": 500, "body": f"Error: {str(e)}"} \ No newline at end of file diff --git a/backend/Transform.py b/backend/Transform.py deleted file mode 100644 index 14a6976..0000000 --- a/backend/Transform.py +++ /dev/null @@ -1,57 +0,0 @@ -from turtle import pd -import boto3 -import tarfile -import json -import psycopg2 -import io - -from EntityRecog.Utils import get_postgresql_connection -def lambda_handler(event, context): - for record in event['Records']: - print(f"New record: {record}") - bucket = record['s3']['bucket']['name'] - key = record['s3']['object']['key'] - conn = get_postgresql_connection() - s3 = boto3.client('s3') - obj = s3.get_object(Bucket=bucket, Key=key) - tar_bytes = io.BytesIO(obj['Body'].read()) - - # Extract .json inside the tar.gz - with tarfile.open(fileobj=tar_bytes, mode='r:gz') as tar: - for member in tar.getmembers(): - if member.name == "output" and member.isfile(): - file = tar.extractfile(member) - results = json.load(file) - print(f"Extracted JSON: {results}") - break - - if not results: - folderSplit = key.split('/') - type = folderSplit[0] - cursor = conn.cursor() - query = "SELECT * FROM comprehend_jobs WHERE entities_path = %s or sentiment_path = %s or key_phrases_path = %s" - cursor.execute(query, (key, key, key)) - row = cursor.fetchone() - if row: - article_id = row['article_id'] - for result in results: - if type == 'entities': - location_mentions = ', '.join([entity['Text'] for entity in result['Entities'] if entity['Type'] == 'LOCATION']) - officials_involved = ', '.join([entity['Text'] for entity in result['Entities'] if entity['Type'] == 'PERSON']) - relevance_category = ', '.join([entity['Text'] for entity in result['Entities'] if entity['Type'] == 'TITLE']) - if not location_mentions: - cursor.execute("""update articles set location_mentions = %s where articles_id = %s""", (location_mentions, article_id)) - if not officials_involved: - cursor.execute("""update articles set officials_involved = %s where articles_id = %s""", (officials_involved, article_id)) - if not relevance_category: - cursor.execute("""update articles set relevance_category = %s where articles_id = %s""", (relevance_category, article_id)) - elif type == 'sentiment': - sentiment = row.get('Sentiment', 'NEUTRAL') - if not sentiment: - cursor.execute("""update articles set sentiment = %s where articles_id = %s""", (sentiment, article_id)) - elif type == 'keyphrases': - key_phrases = ', '.join(row.get('KeyPhrases', [])) - if not key_phrases: - cursor.execute("""update articles set key_phrases = %s where articles_id = %s""", (key_phrases, article_id)) - cursor.close() - conn.close() \ No newline at end of file diff --git a/backend/Utils.py b/backend/Utils.py index 371a1dc..41ed2b4 100644 --- a/backend/Utils.py +++ b/backend/Utils.py @@ -1,7 +1,5 @@ import json import psycopg2 -from psycopg2 import sql - def get_postgresql_connection(): '''get the creds from local config''' diff --git a/backend/annotations.json b/backend/annotations.json deleted file mode 100644 index a6625b0..0000000 --- a/backend/annotations.json +++ /dev/null @@ -1,962 +0,0 @@ -[ - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 30, - "label": "PLACE" - }, - { - "beginOffset": 47, - "endOffset": 53, - "label": "TOPIC" - }, - { - "beginOffset": 55, - "endOffset": 63, - "label": "TOPIC" - }, - { - "beginOffset": 69, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 26, - "label": "PLACE" - }, - { - "beginOffset": 43, - "endOffset": 50, - "label": "TOPIC" - }, - { - "beginOffset": 52, - "endOffset": 62, - "label": "TOPIC" - }, - { - "beginOffset": 68, - "endOffset": 74, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 29, - "label": "PLACE" - }, - { - "beginOffset": 46, - "endOffset": 52, - "label": "TOPIC" - }, - { - "beginOffset": 54, - "endOffset": 60, - "label": "TOPIC" - }, - { - "beginOffset": 66, - "endOffset": 74, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 63, - "endOffset": 73, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 51, - "label": "TOPIC" - }, - { - "beginOffset": 53, - "endOffset": 62, - "label": "TOPIC" - }, - { - "beginOffset": 68, - "endOffset": 74, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 29, - "label": "PLACE" - }, - { - "beginOffset": 46, - "endOffset": 52, - "label": "TOPIC" - }, - { - "beginOffset": 54, - "endOffset": 60, - "label": "TOPIC" - }, - { - "beginOffset": 66, - "endOffset": 73, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 27, - "label": "PLACE" - }, - { - "beginOffset": 44, - "endOffset": 51, - "label": "TOPIC" - }, - { - "beginOffset": 53, - "endOffset": 59, - "label": "TOPIC" - }, - { - "beginOffset": 65, - "endOffset": 79, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 51, - "label": "TOPIC" - }, - { - "beginOffset": 53, - "endOffset": 61, - "label": "TOPIC" - }, - { - "beginOffset": 67, - "endOffset": 72, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 27, - "label": "PLACE" - }, - { - "beginOffset": 44, - "endOffset": 52, - "label": "TOPIC" - }, - { - "beginOffset": 54, - "endOffset": 65, - "label": "TOPIC" - }, - { - "beginOffset": 71, - "endOffset": 76, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 50, - "label": "TOPIC" - }, - { - "beginOffset": 52, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 63, - "endOffset": 70, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 63, - "endOffset": 69, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 54, - "label": "TOPIC" - }, - { - "beginOffset": 56, - "endOffset": 67, - "label": "TOPIC" - }, - { - "beginOffset": 73, - "endOffset": 82, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 50, - "label": "TOPIC" - }, - { - "beginOffset": 52, - "endOffset": 58, - "label": "TOPIC" - }, - { - "beginOffset": 64, - "endOffset": 73, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 27, - "label": "PLACE" - }, - { - "beginOffset": 44, - "endOffset": 55, - "label": "TOPIC" - }, - { - "beginOffset": 57, - "endOffset": 66, - "label": "TOPIC" - }, - { - "beginOffset": 72, - "endOffset": 82, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 45, - "label": "TOPIC" - }, - { - "beginOffset": 47, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 63, - "endOffset": 70, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 27, - "label": "PLACE" - }, - { - "beginOffset": 44, - "endOffset": 54, - "label": "TOPIC" - }, - { - "beginOffset": 56, - "endOffset": 65, - "label": "TOPIC" - }, - { - "beginOffset": 71, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 48, - "label": "TOPIC" - }, - { - "beginOffset": 50, - "endOffset": 60, - "label": "TOPIC" - }, - { - "beginOffset": 66, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 26, - "label": "PLACE" - }, - { - "beginOffset": 43, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 63, - "endOffset": 70, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 29, - "label": "PLACE" - }, - { - "beginOffset": 46, - "endOffset": 54, - "label": "TOPIC" - }, - { - "beginOffset": 56, - "endOffset": 65, - "label": "TOPIC" - }, - { - "beginOffset": 71, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 50, - "label": "TOPIC" - }, - { - "beginOffset": 52, - "endOffset": 59, - "label": "TOPIC" - }, - { - "beginOffset": 65, - "endOffset": 74, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 48, - "label": "TOPIC" - }, - { - "beginOffset": 50, - "endOffset": 56, - "label": "TOPIC" - }, - { - "beginOffset": 62, - "endOffset": 68, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 29, - "label": "PLACE" - }, - { - "beginOffset": 46, - "endOffset": 56, - "label": "TOPIC" - }, - { - "beginOffset": 58, - "endOffset": 64, - "label": "TOPIC" - }, - { - "beginOffset": 70, - "endOffset": 79, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 46, - "label": "TOPIC" - }, - { - "beginOffset": 48, - "endOffset": 55, - "label": "TOPIC" - }, - { - "beginOffset": 61, - "endOffset": 71, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 58, - "label": "TOPIC" - }, - { - "beginOffset": 64, - "endOffset": 73, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 26, - "label": "PLACE" - }, - { - "beginOffset": 43, - "endOffset": 53, - "label": "TOPIC" - }, - { - "beginOffset": 55, - "endOffset": 62, - "label": "TOPIC" - }, - { - "beginOffset": 68, - "endOffset": 77, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 55, - "label": "TOPIC" - }, - { - "beginOffset": 57, - "endOffset": 67, - "label": "TOPIC" - }, - { - "beginOffset": 73, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 47, - "label": "TOPIC" - }, - { - "beginOffset": 49, - "endOffset": 56, - "label": "TOPIC" - }, - { - "beginOffset": 62, - "endOffset": 71, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 52, - "label": "TOPIC" - }, - { - "beginOffset": 54, - "endOffset": 64, - "label": "TOPIC" - }, - { - "beginOffset": 70, - "endOffset": 77, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 65, - "label": "TOPIC" - }, - { - "beginOffset": 71, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 48, - "label": "TOPIC" - }, - { - "beginOffset": 50, - "endOffset": 59, - "label": "TOPIC" - }, - { - "beginOffset": 65, - "endOffset": 74, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 47, - "label": "TOPIC" - }, - { - "beginOffset": 49, - "endOffset": 55, - "label": "TOPIC" - }, - { - "beginOffset": 61, - "endOffset": 71, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 46, - "label": "TOPIC" - }, - { - "beginOffset": 48, - "endOffset": 55, - "label": "TOPIC" - }, - { - "beginOffset": 61, - "endOffset": 70, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 29, - "label": "PLACE" - }, - { - "beginOffset": 46, - "endOffset": 53, - "label": "TOPIC" - }, - { - "beginOffset": 55, - "endOffset": 60, - "label": "TOPIC" - }, - { - "beginOffset": 66, - "endOffset": 76, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 49, - "label": "TOPIC" - }, - { - "beginOffset": 51, - "endOffset": 61, - "label": "TOPIC" - }, - { - "beginOffset": 67, - "endOffset": 76, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 24, - "label": "PLACE" - }, - { - "beginOffset": 41, - "endOffset": 52, - "label": "TOPIC" - }, - { - "beginOffset": 54, - "endOffset": 63, - "label": "TOPIC" - }, - { - "beginOffset": 69, - "endOffset": 79, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 30, - "label": "PLACE" - }, - { - "beginOffset": 47, - "endOffset": 57, - "label": "TOPIC" - }, - { - "beginOffset": 59, - "endOffset": 64, - "label": "TOPIC" - }, - { - "beginOffset": 70, - "endOffset": 75, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 25, - "label": "PLACE" - }, - { - "beginOffset": 42, - "endOffset": 51, - "label": "TOPIC" - }, - { - "beginOffset": 53, - "endOffset": 60, - "label": "TOPIC" - }, - { - "beginOffset": 66, - "endOffset": 75, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 27, - "label": "PLACE" - }, - { - "beginOffset": 44, - "endOffset": 50, - "label": "TOPIC" - }, - { - "beginOffset": 52, - "endOffset": 58, - "label": "TOPIC" - }, - { - "beginOffset": 64, - "endOffset": 71, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 54, - "label": "TOPIC" - }, - { - "beginOffset": 56, - "endOffset": 67, - "label": "TOPIC" - }, - { - "beginOffset": 73, - "endOffset": 80, - "label": "TOPIC" - } - ] - }, - { - "annotations": [ - { - "beginOffset": 17, - "endOffset": 23, - "label": "PLACE" - }, - { - "beginOffset": 40, - "endOffset": 46, - "label": "TOPIC" - }, - { - "beginOffset": 48, - "endOffset": 58, - "label": "TOPIC" - }, - { - "beginOffset": 64, - "endOffset": 71, - "label": "TOPIC" - } - ] - } -] \ No newline at end of file diff --git a/backend/clustering_service/clustering_service.py b/backend/clustering_service/clustering_service.py new file mode 100644 index 0000000..e9ac3de --- /dev/null +++ b/backend/clustering_service/clustering_service.py @@ -0,0 +1,327 @@ +from Utils import get_postgresql_connection + +import numpy as np +from sentence_transformers import SentenceTransformer +import faiss +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +import networkx as nx +from collections import defaultdict +import pickle +import json + +class NewsArticleIndexer: + """ + Multi-algorithm news article indexer with various similarity detection methods + """ + + def __init__(self, embedding_model='all-MiniLM-L6-v2', use_gpu=False): + # Initialize embedding model + self.embedding_model = SentenceTransformer(embedding_model) + if use_gpu: + self.embedding_model = self.embedding_model.cuda() + + # Initialize various indexing structures + self.articles = [] + self.embeddings = None + self.faiss_index = None + self.tfidf_vectorizer = None + self.tfidf_matrix = None + self.graph = None + + def add_articles(self, articles): + """Add articles to the indexer""" + self.articles.extend(articles) + self._build_indices() + + def preprocess_text(self, article): + """Enhanced preprocessing for better relevance""" + # Combine title (weighted more heavily) and content + title_weight = 2 # Give title more importance + return f"{' '.join([article['title']] * title_weight)} {article.get('location_mention', '')} {article.get('officals_involved', '')} {article.get('relevance_category', '')}" + + def _build_indices(self): + """Build all indexing structures""" + texts = [self.preprocess_text(article) for article in self.articles] + + # 1. SEMANTIC EMBEDDINGS (Best for semantic similarity) + print("Building semantic embeddings...") + self.embeddings = self.embedding_model.encode(texts, convert_to_tensor=True) + + # 2. FAISS INDEX (Best for large-scale retrieval) + print("Building FAISS index...") + self._build_faiss_index() + + # 3. TF-IDF (Best for keyword-based similarity) + print("Building TF-IDF index...") + self._build_tfidf_index(texts) + + # 4. GRAPH-BASED (Best for discovering article networks) + print("Building article graph...") + self._build_article_graph() + + def _build_faiss_index(self): + """Build FAISS index for fast similarity search""" + d = self.embeddings.shape[1] + + # For small datasets: IndexFlatL2 (exact search) + # For large datasets: IndexIVFFlat (approximate search) + if len(self.articles) < 10000: + self.faiss_index = faiss.IndexFlatL2(d) + else: + # Use IVF for larger datasets + nlist = min(100, len(self.articles) // 10) # number of clusters + quantizer = faiss.IndexFlatL2(d) + self.faiss_index = faiss.IndexIVFFlat(quantizer, d, nlist) + # Train the index + self.faiss_index.train(self.embeddings.cpu().numpy()) + + self.faiss_index.add(self.embeddings.cpu().numpy()) + + def _build_tfidf_index(self, texts): + """Build TF-IDF index for keyword-based similarity""" + self.tfidf_vectorizer = TfidfVectorizer( + max_features=5000, + stop_words='english', + ngram_range=(1, 2), # Include bigrams + min_df=1, + max_df=0.95 + ) + self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(texts) + + def _build_article_graph(self, similarity_threshold=0.3): + """Build graph of related articles""" + self.graph = nx.Graph() + + # Add all articles as nodes + for i, article in enumerate(self.articles): + self.graph.add_node(i, **article) + + # Add edges based on similarity + for i in range(len(self.articles)): + similar_articles = self.find_similar_semantic(i, k=5, return_scores=True) + for j, score in similar_articles: + if i != j and score > similarity_threshold: + self.graph.add_edge(i, j, weight=score) + + # ===== SIMILARITY SEARCH METHODS ===== + + def find_similar_semantic(self, query_idx, k=5, return_scores=False): + """Find similar articles using semantic embeddings (BEST for meaning)""" + query_embedding = self.embeddings[query_idx].cpu().numpy().reshape(1, -1) + + # Search using FAISS + distances, indices = self.faiss_index.search(query_embedding, k + 1) + + # Remove the query article itself + results = [] + for i, (dist, idx) in enumerate(zip(distances[0], indices[0])): + if idx != query_idx: + similarity_score = 1 / (1 + dist) # Convert distance to similarity + if return_scores: + results.append((idx, similarity_score)) + else: + results.append(idx) + if len(results) >= k: + break + + return results + + def find_similar_tfidf(self, query_idx, k=5, return_scores=False): + """Find similar articles using TF-IDF (BEST for keywords)""" + query_vector = self.tfidf_matrix[query_idx] + similarities = cosine_similarity(query_vector, self.tfidf_matrix).flatten() + + # Get top-k similar articles (excluding self) + similar_indices = similarities.argsort()[::-1] + results = [] + + for idx in similar_indices: + if idx != query_idx and len(results) < k: + if return_scores: + results.append((idx, similarities[idx])) + else: + results.append(idx) + + return results + + def find_similar_hybrid(self, query_idx, k=5, semantic_weight=0.7, return_scores=False): + """Hybrid approach combining semantic and TF-IDF (BEST overall)""" + # Get semantic similarities + semantic_results = self.find_similar_semantic(query_idx, k=k*2, return_scores=True) + tfidf_results = self.find_similar_tfidf(query_idx, k=k*2, return_scores=True) + + # Combine scores + combined_scores = defaultdict(float) + + for idx, score in semantic_results: + combined_scores[idx] += semantic_weight * score + + for idx, score in tfidf_results: + combined_scores[idx] += (1 - semantic_weight) * score + + # Sort by combined score + sorted_results = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) + + if return_scores: + return sorted_results[:k] + else: + return [idx for idx, _ in sorted_results[:k]] + + def find_similar_graph(self, query_idx, k=5): + """Find similar articles using graph-based methods""" + if not self.graph.has_node(query_idx): + return [] + + # Get neighbors sorted by edge weight + neighbors = list(self.graph.neighbors(query_idx)) + neighbor_weights = [(n, self.graph[query_idx][n]['weight']) for n in neighbors] + neighbor_weights.sort(key=lambda x: x[1], reverse=True) + + return [idx for idx, _ in neighbor_weights[:k]] + + # ===== ADVANCED ANALYSIS METHODS ===== + + def detect_article_clusters(self, method='semantic', n_clusters=None): + """Detect clusters of related articles""" + from sklearn.cluster import KMeans, DBSCAN + + if method == 'semantic': + features = self.embeddings.cpu().numpy() + elif method == 'tfidf': + features = self.tfidf_matrix.toarray() + + if n_clusters: + clusterer = KMeans(n_clusters=n_clusters, random_state=42) + else: + clusterer = DBSCAN(eps=0.5, min_samples=2) + + cluster_labels = clusterer.fit_predict(features) + + # Group articles by cluster + clusters = defaultdict(list) + for i, label in enumerate(cluster_labels): + clusters[label].append(i) + + return dict(clusters) + + def find_trending_topics(self, time_window_hours=24): + """Find trending topics (if articles have timestamps)""" + # This would require timestamp information in articles + # Implementation would filter recent articles and cluster them + pass + + def get_article_importance_scores(self): + """Calculate importance scores using PageRank on article graph""" + if self.graph is None: + return {} + + pagerank_scores = nx.pagerank(self.graph, weight='weight') + return pagerank_scores + + def save_index(self, filepath): + """Save the entire index to disk""" + index_data = { + 'articles': self.articles, + 'embeddings': self.embeddings.cpu().numpy() if self.embeddings is not None else None, + 'tfidf_vectorizer': self.tfidf_vectorizer, + 'tfidf_matrix': self.tfidf_matrix, + 'graph': self.graph + } + + with open(filepath, 'wb') as f: + pickle.dump(index_data, f) + + # Save FAISS index separately + if self.faiss_index is not None: + faiss.write_index(self.faiss_index, f"{filepath}.faiss") + + def load_index(self, filepath): + """Load index from disk""" + with open(filepath, 'rb') as f: + index_data = pickle.load(f) + + self.articles = index_data['articles'] + self.embeddings = index_data['embeddings'] + self.tfidf_vectorizer = index_data['tfidf_vectorizer'] + self.tfidf_matrix = index_data['tfidf_matrix'] + self.graph = index_data['graph'] + + # Load FAISS index + try: + self.faiss_index = faiss.read_index(f"{filepath}.faiss") + except: + print("Could not load FAISS index, rebuilding...") + self._build_faiss_index() + +# ===== USAGE EXAMPLE ===== + +def main(): + # Sample articles + conn = get_postgresql_connection() + cursor = conn.cursor() + query = "SELECT * FROM articles order by article_id asc" + cursor.execute(query) + articles_db = cursor.fetchall() + articles = [{} for _ in range(len(articles_db))] + for i, article in enumerate(articles_db): + articles[i] = { + "article_id": article[0], + "title": article[1], + # "content": article[2], + "location_mention": article[5], + "officals_involved": article[6], + "relevance_category": article[7], + } + + # Initialize indexer + indexer = NewsArticleIndexer() + indexer.add_articles(articles) + + # Test different similarity methods + # "President Signs New Trade Deal" + for i, article in enumerate(articles): + query_idx = i + print(f"\nšŸ” i: '{i}'") + print(f"\nšŸ” Finding articles similar to: '{articles[query_idx]['title']}'") + linked_id = [] + print("\n1. Semantic Similarity (Best for meaning):") + semantic_results = indexer.find_similar_semantic(query_idx, k=3) + for idx in semantic_results: + linked_id.append(articles[idx]['article_id']) + print(f" - {articles[idx]['title']}") + + # print("\n2. TF-IDF Similarity (Best for keywords):") + # tfidf_results = indexer.find_similar_tfidf(query_idx, k=3) + # for idx in tfidf_results: + # print(f" - {articles[idx]['title']}") + + print("\n3. Hybrid Similarity (Best overall):") + hybrid_results = indexer.find_similar_hybrid(query_idx, k=3) + for idx in hybrid_results: + print(f" - {articles[idx]['title']}") + cursor.execute("UPDATE articles SET linked_id = %s WHERE article_id = %s", (linked_id, articles[query_idx]['article_id'])) + conn.commit() + # print("\n4. Graph-based Similarity:") + # graph_results = indexer.find_similar_graph(query_idx, k=3) + # for idx in graph_results: + # print(f" - {articles[idx]['title']}") + + # # Detect clusters + # print("\n Article Clusters:") + # clusters = indexer.detect_article_clusters(method='semantic') + # for cluster_id, article_indices in clusters.items(): + # if cluster_id != -1: # Ignore noise cluster + # print(f"Cluster {cluster_id}:") + # for idx in article_indices: + # print(f" - {articles[idx]['title']}") + + # Calculate importance scores + # print("\n Article Importance Scores:") + # importance_scores = indexer.get_article_importance_scores() + # sorted_importance = sorted(importance_scores.items(), key=lambda x: x[1], reverse=True) + # for idx, score in sorted_importance[:3]: + # print(f" {score:.3f} - {articles[idx]['title']}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/backend/get_feed/get_feed.py b/backend/get_feed/get_feed.py new file mode 100644 index 0000000..7616578 --- /dev/null +++ b/backend/get_feed/get_feed.py @@ -0,0 +1,20 @@ + +import json +from Utils import get_postgresql_connection + + +def lambda_handler(event, context): + conn = get_postgresql_connection() + cursor = conn.cursor() + query = "SELECT * FROM articles order by article_id asc" + cursor.execute(query) + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + result = [dict(zip(columns, row)) for row in rows] + return { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": json.dumps(result) + } \ No newline at end of file diff --git a/backend/input.csv b/backend/input.csv deleted file mode 100644 index ae4e274..0000000 --- a/backend/input.csv +++ /dev/null @@ -1,41 +0,0 @@ -article_id,title,body,source,publishedDate -1,"Visakhapatnam news update: Policy, heritage, environment","Recent events in Visakhapatnam have focused on policy, heritage, and environment. Authorities are responding accordingly to ensure public welfare.",New Indian Express,01/06/2025 -2,"Hyderabad news update: Weather, cybercrime, policy","Recent events in Hyderabad have focused on weather, cybercrime, and policy. Authorities are responding accordingly to ensure public welfare.",Eenadu,29/06/2025 -3,"Vizianagaram news update: Policy, sports, heritage","Recent events in Vizianagaram have focused on policy, sports, and heritage. Authorities are responding accordingly to ensure public welfare.",The Hindu,03/06/2025 -4,"Ongole news update: Education, policy, employment","Recent events in Ongole have focused on education, policy, and employment. Authorities are responding accordingly to ensure public welfare.",Andhra Jyothy,25/06/2025 -5,"Tirupati news update: Transport, elections, policy","Recent events in Tirupati have focused on transport, elections, and policy. Authorities are responding accordingly to ensure public welfare.",The Hindu,10/06/2025 -6,"Vizianagaram news update: Policy, sports, startup","Recent events in Vizianagaram have focused on policy, sports, and startup. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,15/06/2025 -7,"Vijayawada news update: Startup, sports, infrastructure","Recent events in Vijayawada have focused on startup, sports, and infrastructure. Authorities are responding accordingly to ensure public welfare.",New Indian Express,25/06/2025 -8,"Nellore news update: Cybercrime, heritage, flood","Recent events in Nellore have focused on cybercrime, heritage, and flood. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,16/06/2025 -9,"Vijayawada news update: Heritage, environment, flood","Recent events in Vijayawada have focused on heritage, environment, and flood. Authorities are responding accordingly to ensure public welfare.",New Indian Express,07/06/2025 -10,"Kurnool news update: Transport, crime, culture","Recent events in Kurnool have focused on transport, crime, and culture. Authorities are responding accordingly to ensure public welfare.",The Hindu,29/06/2025 -11,"Guntur news update: Elections, policy, health","Recent events in Guntur have focused on elections, policy, and health. Authorities are responding accordingly to ensure public welfare.",Sakshi,13/06/2025 -12,"Guntur news update: Infrastructure, environment, transport","Recent events in Guntur have focused on infrastructure, environment, and transport. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,06/06/2025 -13,"Guntur news update: Cybercrime, energy, transport","Recent events in Guntur have focused on cybercrime, energy, and transport. Authorities are responding accordingly to ensure public welfare.",Sakshi,14/06/2025 -14,"Srikakulam news update: Environment, elections, government","Recent events in Srikakulam have focused on environment, elections, and government. Authorities are responding accordingly to ensure public welfare.",Times of India,06/06/2025 -15,"Ongole news update: Flood, government, weather","Recent events in Ongole have focused on flood, government, and weather. Authorities are responding accordingly to ensure public welfare.",Times of India,06/06/2025 -16,"Srikakulam news update: Government, transport, education","Recent events in Srikakulam have focused on government, transport, and education. Authorities are responding accordingly to ensure public welfare.",Sakshi,07/06/2025 -17,"Kurnool news update: Weather, government, infrastructure","Recent events in Kurnool have focused on weather, government, and infrastructure. Authorities are responding accordingly to ensure public welfare.",The Hindu,11/06/2025 -18,"Anantapur news update: Policy, health, weather","Recent events in Anantapur have focused on policy, health, and weather. Authorities are responding accordingly to ensure public welfare.",New Indian Express,15/06/2025 -19,"Vizianagaram news update: Heritage, transport, education","Recent events in Vizianagaram have focused on heritage, transport, and education. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,08/06/2025 -20,"Tirupati news update: Heritage, weather, education","Recent events in Tirupati have focused on heritage, weather, and education. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,18/06/2025 -21,"Kurnool news update: Startup, sports, policy","Recent events in Kurnool have focused on startup, sports, and policy. Authorities are responding accordingly to ensure public welfare.",The Hindu,08/06/2025 -22,"Vizianagaram news update: Technology, health, education","Recent events in Vizianagaram have focused on technology, health, and education. Authorities are responding accordingly to ensure public welfare.",The Hindu,03/06/2025 -23,"Kadapa news update: Health, weather, employment","Recent events in Kadapa have focused on health, weather, and employment. Authorities are responding accordingly to ensure public welfare.",New Indian Express,19/06/2025 -24,"Tirupati news update: Culture, startup, transport","Recent events in Tirupati have focused on culture, startup, and transport. Authorities are responding accordingly to ensure public welfare.",Eenadu,04/06/2025 -25,"Anantapur news update: Employment, culture, education","Recent events in Anantapur have focused on employment, culture, and education. Authorities are responding accordingly to ensure public welfare.",Eenadu,22/06/2025 -26,"Kurnool news update: Infrastructure, cybercrime, culture","Recent events in Kurnool have focused on infrastructure, cybercrime, and culture. Authorities are responding accordingly to ensure public welfare.",The Hindu,14/06/2025 -27,"Kurnool news update: Policy, culture, education","Recent events in Kurnool have focused on policy, culture, and education. Authorities are responding accordingly to ensure public welfare.",Eenadu,08/06/2025 -28,"Tirupati news update: Technology, employment, weather","Recent events in Tirupati have focused on technology, employment, and weather. Authorities are responding accordingly to ensure public welfare.",Sakshi,17/06/2025 -29,"Tirupati news update: Weather, infrastructure, elections","Recent events in Tirupati have focused on weather, infrastructure, and elections. Authorities are responding accordingly to ensure public welfare.",Times of India,21/06/2025 -30,"Tirupati news update: Sports, education, transport","Recent events in Tirupati have focused on sports, education, and transport. Authorities are responding accordingly to ensure public welfare.",Times of India,02/06/2025 -31,"Ongole news update: Weather, health, employment","Recent events in Ongole have focused on weather, health, and employment. Authorities are responding accordingly to ensure public welfare.",Deccan Chronicle,21/06/2025 -32,"Kadapa news update: Policy, culture, elections","Recent events in Kadapa have focused on policy, culture, and elections. Authorities are responding accordingly to ensure public welfare.",Sakshi,12/06/2025 -33,"Vizianagaram news update: Weather, crime, cybercrime","Recent events in Vizianagaram have focused on weather, crime, and cybercrime. Authorities are responding accordingly to ensure public welfare.",Sakshi,01/06/2025 -34,"Tirupati news update: Startup, employment, transport","Recent events in Tirupati have focused on startup, employment, and transport. Authorities are responding accordingly to ensure public welfare.",The Hindu,21/06/2025 -35,"Nellore news update: Environment, transport, technology","Recent events in Nellore have focused on environment, transport, and technology. Authorities are responding accordingly to ensure public welfare.",Sakshi,07/06/2025 -36,"Visakhapatnam news update: Technology, flood, crime","Recent events in Visakhapatnam have focused on technology, flood, and crime. Authorities are responding accordingly to ensure public welfare.",Andhra Jyothy,14/06/2025 -37,"Chittoor news update: Transport, weather, education","Recent events in Chittoor have focused on transport, weather, and education. Authorities are responding accordingly to ensure public welfare.",Eenadu,08/06/2025 -38,"Vijayawada news update: Health, policy, culture","Recent events in Vijayawada have focused on health, policy, and culture. Authorities are responding accordingly to ensure public welfare.",Eenadu,12/06/2025 -39,"Kadapa news update: Infrastructure, environment, weather","Recent events in Kadapa have focused on infrastructure, environment, and weather. Authorities are responding accordingly to ensure public welfare.",New Indian Express,09/06/2025 -40,"Kadapa news update: Health, government, weather","Recent events in Kadapa have focused on health, government, and weather. Authorities are responding accordingly to ensure public welfare.",The Hindu,02/06/2025 \ No newline at end of file diff --git a/backend/input_feed.json b/backend/input_feed.json deleted file mode 100644 index ac56db2..0000000 --- a/backend/input_feed.json +++ /dev/null @@ -1,602 +0,0 @@ -[ - { - "title": "Visakhapatnam news update: Policy, heritage, environment", - "body": "Recent events in Visakhapatnam have focused on policy, heritage, and environment. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "01/06/2025", - "extractedLocations": [ - "Visakhapatnam" - ], - "districtMapping": "Visakhapatnam", - "tags": [ - "policy", - "heritage", - "environment" - ] - }, - { - "title": "Hyderabad news update: Weather, cybercrime, policy", - "body": "Recent events in Hyderabad have focused on weather, cybercrime, and policy. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "29/06/2025", - "extractedLocations": [ - "Hyderabad" - ], - "districtMapping": "Hyderabad", - "tags": [ - "weather", - "cybercrime", - "policy" - ] - }, - { - "title": "Vizianagaram news update: Policy, sports, heritage", - "body": "Recent events in Vizianagaram have focused on policy, sports, and heritage. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "03/06/2025", - "extractedLocations": [ - "Vizianagaram" - ], - "districtMapping": "Vizianagaram", - "tags": [ - "policy", - "sports", - "heritage" - ] - }, - { - "title": "Ongole news update: Education, policy, employment", - "body": "Recent events in Ongole have focused on education, policy, and employment. Authorities are responding accordingly to ensure public welfare.", - "source": "Andhra Jyothy", - "publishedDate": "25/06/2025", - "extractedLocations": [ - "Ongole" - ], - "districtMapping": "Prakasam", - "tags": [ - "education", - "policy", - "employment" - ] - }, - { - "title": "Tirupati news update: Transport, elections, policy", - "body": "Recent events in Tirupati have focused on transport, elections, and policy. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "10/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "transport", - "elections", - "policy" - ] - }, - { - "title": "Vizianagaram news update: Policy, sports, startup", - "body": "Recent events in Vizianagaram have focused on policy, sports, and startup. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "15/06/2025", - "extractedLocations": [ - "Vizianagaram" - ], - "districtMapping": "Vizianagaram", - "tags": [ - "policy", - "sports", - "startup" - ] - }, - { - "title": "Vijayawada news update: Startup, sports, infrastructure", - "body": "Recent events in Vijayawada have focused on startup, sports, and infrastructure. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "25/06/2025", - "extractedLocations": [ - "Vijayawada" - ], - "districtMapping": "Krishna", - "tags": [ - "startup", - "sports", - "infrastructure" - ] - }, - { - "title": "Nellore news update: Cybercrime, heritage, flood", - "body": "Recent events in Nellore have focused on cybercrime, heritage, and flood. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "16/06/2025", - "extractedLocations": [ - "Nellore" - ], - "districtMapping": "Nellore", - "tags": [ - "cybercrime", - "heritage", - "flood" - ] - }, - { - "title": "Vijayawada news update: Heritage, environment, flood", - "body": "Recent events in Vijayawada have focused on heritage, environment, and flood. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "07/06/2025", - "extractedLocations": [ - "Vijayawada" - ], - "districtMapping": "Krishna", - "tags": [ - "heritage", - "environment", - "flood" - ] - }, - { - "title": "Kurnool news update: Transport, crime, culture", - "body": "Recent events in Kurnool have focused on transport, crime, and culture. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "29/06/2025", - "extractedLocations": [ - "Kurnool" - ], - "districtMapping": "Kurnool", - "tags": [ - "transport", - "crime", - "culture" - ] - }, - { - "title": "Guntur news update: Elections, policy, health", - "body": "Recent events in Guntur have focused on elections, policy, and health. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "13/06/2025", - "extractedLocations": [ - "Guntur" - ], - "districtMapping": "Guntur", - "tags": [ - "elections", - "policy", - "health" - ] - }, - { - "title": "Guntur news update: Infrastructure, environment, transport", - "body": "Recent events in Guntur have focused on infrastructure, environment, and transport. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "06/06/2025", - "extractedLocations": [ - "Guntur" - ], - "districtMapping": "Guntur", - "tags": [ - "infrastructure", - "environment", - "transport" - ] - }, - { - "title": "Guntur news update: Cybercrime, energy, transport", - "body": "Recent events in Guntur have focused on cybercrime, energy, and transport. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "14/06/2025", - "extractedLocations": [ - "Guntur" - ], - "districtMapping": "Guntur", - "tags": [ - "cybercrime", - "energy", - "transport" - ] - }, - { - "title": "Srikakulam news update: Environment, elections, government", - "body": "Recent events in Srikakulam have focused on environment, elections, and government. Authorities are responding accordingly to ensure public welfare.", - "source": "Times of India", - "publishedDate": "06/06/2025", - "extractedLocations": [ - "Srikakulam" - ], - "districtMapping": "Srikakulam", - "tags": [ - "environment", - "elections", - "government" - ] - }, - { - "title": "Ongole news update: Flood, government, weather", - "body": "Recent events in Ongole have focused on flood, government, and weather. Authorities are responding accordingly to ensure public welfare.", - "source": "Times of India", - "publishedDate": "06/06/2025", - "extractedLocations": [ - "Ongole" - ], - "districtMapping": "Prakasam", - "tags": [ - "flood", - "government", - "weather" - ] - }, - { - "title": "Srikakulam news update: Government, transport, education", - "body": "Recent events in Srikakulam have focused on government, transport, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "07/06/2025", - "extractedLocations": [ - "Srikakulam" - ], - "districtMapping": "Srikakulam", - "tags": [ - "government", - "transport", - "education" - ] - }, - { - "title": "Kurnool news update: Weather, government, infrastructure", - "body": "Recent events in Kurnool have focused on weather, government, and infrastructure. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "11/06/2025", - "extractedLocations": [ - "Kurnool" - ], - "districtMapping": "Kurnool", - "tags": [ - "weather", - "government", - "infrastructure" - ] - }, - { - "title": "Anantapur news update: Policy, health, weather", - "body": "Recent events in Anantapur have focused on policy, health, and weather. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "15/06/2025", - "extractedLocations": [ - "Anantapur" - ], - "districtMapping": "Anantapur", - "tags": [ - "policy", - "health", - "weather" - ] - }, - { - "title": "Vizianagaram news update: Heritage, transport, education", - "body": "Recent events in Vizianagaram have focused on heritage, transport, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "08/06/2025", - "extractedLocations": [ - "Vizianagaram" - ], - "districtMapping": "Vizianagaram", - "tags": [ - "heritage", - "transport", - "education" - ] - }, - { - "title": "Tirupati news update: Heritage, weather, education", - "body": "Recent events in Tirupati have focused on heritage, weather, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "18/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "heritage", - "weather", - "education" - ] - }, - { - "title": "Kurnool news update: Startup, sports, policy", - "body": "Recent events in Kurnool have focused on startup, sports, and policy. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "08/06/2025", - "extractedLocations": [ - "Kurnool" - ], - "districtMapping": "Kurnool", - "tags": [ - "startup", - "sports", - "policy" - ] - }, - { - "title": "Vizianagaram news update: Technology, health, education", - "body": "Recent events in Vizianagaram have focused on technology, health, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "03/06/2025", - "extractedLocations": [ - "Vizianagaram" - ], - "districtMapping": "Vizianagaram", - "tags": [ - "technology", - "health", - "education" - ] - }, - { - "title": "Kadapa news update: Health, weather, employment", - "body": "Recent events in Kadapa have focused on health, weather, and employment. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "19/06/2025", - "extractedLocations": [ - "Kadapa" - ], - "districtMapping": "Kadapa", - "tags": [ - "health", - "weather", - "employment" - ] - }, - { - "title": "Tirupati news update: Culture, startup, transport", - "body": "Recent events in Tirupati have focused on culture, startup, and transport. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "04/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "culture", - "startup", - "transport" - ] - }, - { - "title": "Anantapur news update: Employment, culture, education", - "body": "Recent events in Anantapur have focused on employment, culture, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "22/06/2025", - "extractedLocations": [ - "Anantapur" - ], - "districtMapping": "Anantapur", - "tags": [ - "employment", - "culture", - "education" - ] - }, - { - "title": "Kurnool news update: Infrastructure, cybercrime, culture", - "body": "Recent events in Kurnool have focused on infrastructure, cybercrime, and culture. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "14/06/2025", - "extractedLocations": [ - "Kurnool" - ], - "districtMapping": "Kurnool", - "tags": [ - "infrastructure", - "cybercrime", - "culture" - ] - }, - { - "title": "Kurnool news update: Policy, culture, education", - "body": "Recent events in Kurnool have focused on policy, culture, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "08/06/2025", - "extractedLocations": [ - "Kurnool" - ], - "districtMapping": "Kurnool", - "tags": [ - "policy", - "culture", - "education" - ] - }, - { - "title": "Tirupati news update: Technology, employment, weather", - "body": "Recent events in Tirupati have focused on technology, employment, and weather. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "17/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "technology", - "employment", - "weather" - ] - }, - { - "title": "Tirupati news update: Weather, infrastructure, elections", - "body": "Recent events in Tirupati have focused on weather, infrastructure, and elections. Authorities are responding accordingly to ensure public welfare.", - "source": "Times of India", - "publishedDate": "21/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "weather", - "infrastructure", - "elections" - ] - }, - { - "title": "Tirupati news update: Sports, education, transport", - "body": "Recent events in Tirupati have focused on sports, education, and transport. Authorities are responding accordingly to ensure public welfare.", - "source": "Times of India", - "publishedDate": "02/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "sports", - "education", - "transport" - ] - }, - { - "title": "Ongole news update: Weather, health, employment", - "body": "Recent events in Ongole have focused on weather, health, and employment. Authorities are responding accordingly to ensure public welfare.", - "source": "Deccan Chronicle", - "publishedDate": "21/06/2025", - "extractedLocations": [ - "Ongole" - ], - "districtMapping": "Prakasam", - "tags": [ - "weather", - "health", - "employment" - ] - }, - { - "title": "Kadapa news update: Policy, culture, elections", - "body": "Recent events in Kadapa have focused on policy, culture, and elections. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "12/06/2025", - "extractedLocations": [ - "Kadapa" - ], - "districtMapping": "Kadapa", - "tags": [ - "policy", - "culture", - "elections" - ] - }, - { - "title": "Vizianagaram news update: Weather, crime, cybercrime", - "body": "Recent events in Vizianagaram have focused on weather, crime, and cybercrime. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "01/06/2025", - "extractedLocations": [ - "Vizianagaram" - ], - "districtMapping": "Vizianagaram", - "tags": [ - "weather", - "crime", - "cybercrime" - ] - }, - { - "title": "Tirupati news update: Startup, employment, transport", - "body": "Recent events in Tirupati have focused on startup, employment, and transport. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "21/06/2025", - "extractedLocations": [ - "Tirupati" - ], - "districtMapping": "Tirupati", - "tags": [ - "startup", - "employment", - "transport" - ] - }, - { - "title": "Nellore news update: Environment, transport, technology", - "body": "Recent events in Nellore have focused on environment, transport, and technology. Authorities are responding accordingly to ensure public welfare.", - "source": "Sakshi", - "publishedDate": "07/06/2025", - "extractedLocations": [ - "Nellore" - ], - "districtMapping": "Nellore", - "tags": [ - "environment", - "transport", - "technology" - ] - }, - { - "title": "Visakhapatnam news update: Technology, flood, crime", - "body": "Recent events in Visakhapatnam have focused on technology, flood, and crime. Authorities are responding accordingly to ensure public welfare.", - "source": "Andhra Jyothy", - "publishedDate": "14/06/2025", - "extractedLocations": [ - "Visakhapatnam" - ], - "districtMapping": "Visakhapatnam", - "tags": [ - "technology", - "flood", - "crime" - ] - }, - { - "title": "Chittoor news update: Transport, weather, education", - "body": "Recent events in Chittoor have focused on transport, weather, and education. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "08/06/2025", - "extractedLocations": [ - "Chittoor" - ], - "districtMapping": "Chittoor", - "tags": [ - "transport", - "weather", - "education" - ] - }, - { - "title": "Vijayawada news update: Health, policy, culture", - "body": "Recent events in Vijayawada have focused on health, policy, and culture. Authorities are responding accordingly to ensure public welfare.", - "source": "Eenadu", - "publishedDate": "12/06/2025", - "extractedLocations": [ - "Vijayawada" - ], - "districtMapping": "Krishna", - "tags": [ - "health", - "policy", - "culture" - ] - }, - { - "title": "Kadapa news update: Infrastructure, environment, weather", - "body": "Recent events in Kadapa have focused on infrastructure, environment, and weather. Authorities are responding accordingly to ensure public welfare.", - "source": "New Indian Express", - "publishedDate": "09/06/2025", - "extractedLocations": [ - "Kadapa" - ], - "districtMapping": "Kadapa", - "tags": [ - "infrastructure", - "environment", - "weather" - ] - }, - { - "title": "Kadapa news update: Health, government, weather", - "body": "Recent events in Kadapa have focused on health, government, and weather. Authorities are responding accordingly to ensure public welfare.", - "source": "The Hindu", - "publishedDate": "02/06/2025", - "extractedLocations": [ - "Kadapa" - ], - "districtMapping": "Kadapa", - "tags": [ - "health", - "government", - "weather" - ] - } -] \ No newline at end of file diff --git a/backend/input_handler/input_handler.py b/backend/input_handler/input_handler.py new file mode 100644 index 0000000..50addfe --- /dev/null +++ b/backend/input_handler/input_handler.py @@ -0,0 +1,90 @@ +import csv +import io +import pandas as pd +import boto3 +import time +import uuid +from ..Utils import get_postgresql_connection +def lambda_handler(event, context): + comprehend = boto3.client('comprehend') + s3 = boto3.client('s3') + role_arn = 'arn:aws:iam::269854564686:role/hackathon-comprehend-role' + bucket_name = 'awstraindata' + conn = get_postgresql_connection() + cursor = conn.cursor() + for record in event['Records']: + print(f"New record: {record}") + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + # Download the file object + input_csv_object = s3.get_object(Bucket=bucket_name, Key=key) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS comprehend_jobs ( + article_id TEXT, + input_s3_uri TEXT, + entities_path TEXT, + sentiment_path TEXT, + key_phrases_path TEXT + ) + """) + input_csv = pd.read_csv(io.BytesIO(input_csv_object['Body'].read())) + for index, row in input_csv.iterrows(): + print(f"Processing row {index}: {row}") + articles_id = str(uuid.uuid4()) # Generate a unique ID for each article + cursor.execute(""" + INSERT INTO articles (articles_id, title, body, source, published_date) + VALUES (%s, %s, %s, %s, %s)""", (articles_id, row[1], row[2], row[3], row[4])) + # Convert to CSV in-memory + csv_buffer = io.StringIO() + writer = csv.writer(csv_buffer) + # writer.writerow(row.headers) # Write header + writer.writerow(row) + s3_path = 'input/' + articles_id + '.csv' + s3_uri = 's3://' + bucket_name + '/' + s3_path + s3.put_object( + Bucket=bucket_name, + Key=s3_path, # adjust as needed + Body=csv_buffer.getvalue(), + ContentType='text/csv' + ) + entities_job = comprehend.start_entities_detection_job( + InputDataConfig={'S3Uri': s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, + OutputDataConfig={'S3Uri': 's3://awstraindata/output/entities/'}, + DataAccessRoleArn=role_arn, + LanguageCode='en', + JobName='MyEntityDetectionJob_'+ articles_id + '_' + str(int(time.time())) + ) + result = comprehend.describe_entities_detection_job(JobId=entities_job['JobId']) + entities_output = result['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri'] + + # SENTIMENT detection job + sentiment_job = comprehend.start_sentiment_detection_job( + InputDataConfig={'S3Uri': s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, + OutputDataConfig={'S3Uri': 's3://awstraindata/output/sentiment/'}, + DataAccessRoleArn=role_arn, + LanguageCode='en', + JobName='MySentimentDetectionJob_' + articles_id + '_' + str(int(time.time())) + ) + res = comprehend.describe_sentiment_detection_job(JobId=sentiment_job['JobId']) + sentiment_output = res['SentimentDetectionJobProperties']['OutputDataConfig']['S3Uri'] + + # KEY PHRASES detection job + phrases_job = comprehend.start_key_phrases_detection_job( + InputDataConfig={'S3Uri': s3_uri, 'InputFormat': 'ONE_DOC_PER_LINE'}, + OutputDataConfig={'S3Uri': 's3://awstraindata/output/keyphrases/'}, + DataAccessRoleArn=role_arn, + LanguageCode='en', + JobName='MyKeyPhrasesDetectionJob_' + articles_id + '_' + str(int(time.time())) + ) + res = comprehend.describe_key_phrases_detection_job(JobId=phrases_job['JobId']) + key_phrases_output = res['KeyPhrasesDetectionJobProperties']['OutputDataConfig']['S3Uri'] + print("Entities Job Response:", entities_output) + print("Sentiment Job Response:", sentiment_output) + print("Key Phrases Job Response:", key_phrases_output) + print("Inserting into comprehend_jobs table") + cursor.execute(""" + INSERT INTO comprehend_jobs (article_id, input_s3_uri, entities_path, sentiment_path, key_phrases_path) + VALUES (%s, %s, %s, %s, %s)""", (articles_id, s3_uri, entities_output.replace('s3://awstraindata/', ''), sentiment_output.replace('s3://awstraindata/', ''), key_phrases_output.replace('s3://awstraindata/', ''))) + conn.commit() + cursor.close() + conn.close() \ No newline at end of file diff --git a/backend/Submit.py b/backend/input_handler/input_handler_test.py similarity index 70% rename from backend/Submit.py rename to backend/input_handler/input_handler_test.py index 5cdf6db..5c837a8 100644 --- a/backend/Submit.py +++ b/backend/input_handler/input_handler_test.py @@ -8,19 +8,18 @@ comprehend = boto3.client('comprehend') -input_s3_uri = 's3://awstraindata/input.csv' role_arn = 'arn:aws:iam::269854564686:role/hackathon-comprehend-role' bucket_name = 'awstraindata' s3 = boto3.client('s3') # Download the file object -input_csv_object = s3.get_object(Bucket=bucket_name, Key='input.csv') +input_csv_object = s3.get_object(Bucket=bucket_name, Key='input_small.csv') # Read CSV into DataFrame conn = get_postgresql_connection() cursor = conn.cursor() cursor.execute("drop table if exists articles") cursor.execute("""CREATE TABLE IF NOT EXISTS articles ( - articles_id TEXT, + article_id TEXT, title TEXT, body TEXT, source TEXT, @@ -30,19 +29,31 @@ relevance_category TEXT, sentiment TEXT )""") +cursor.execute(""" + drop table if exists comprehend_jobs +""") +cursor.execute(""" + CREATE TABLE IF NOT EXISTS comprehend_jobs ( + article_id TEXT, + input_s3_uri TEXT, + entities_path TEXT, + sentiment_path TEXT, + key_phrases_path TEXT + ) + """) input_csv = pd.read_csv(io.BytesIO(input_csv_object['Body'].read())) for index, row in input_csv.iterrows(): print(f"Processing row {index}: {row}") - articles_id = str(uuid.uuid4()) # Generate a unique ID for each article + article_id = str(uuid.uuid4()) # Generate a unique ID for each article cursor.execute(""" - INSERT INTO articles (articles_id, title, body, source, published_date) - VALUES (%s, %s, %s, %s, %s)""", (articles_id, row[1], row[2], row[3], row[4])) + INSERT INTO articles (article_id, title, body, source, published_date) + VALUES (%s, %s, %s, %s, %s)""", (article_id, row[1], row[2], row[3], row[4])) # Convert to CSV in-memory csv_buffer = io.StringIO() writer = csv.writer(csv_buffer) # writer.writerow(row.headers) # Write header writer.writerow(row) - s3_path = 'input/' + articles_id + '.csv' + s3_path = 'input/' + article_id + '.csv' s3_uri = 's3://' + bucket_name + '/' + s3_path s3.put_object( Bucket=bucket_name, @@ -55,7 +66,7 @@ OutputDataConfig={'S3Uri': 's3://awstraindata/output/entities/'}, DataAccessRoleArn=role_arn, LanguageCode='en', - JobName='MyEntityDetectionJob_' + str(int(time.time())), + JobName='MyEntityDetectionJob_'+ article_id + '_' + str(int(time.time())) ) result = comprehend.describe_entities_detection_job(JobId=entities_job['JobId']) entities_output = result['EntitiesDetectionJobProperties']['OutputDataConfig']['S3Uri'] @@ -66,7 +77,7 @@ OutputDataConfig={'S3Uri': 's3://awstraindata/output/sentiment/'}, DataAccessRoleArn=role_arn, LanguageCode='en', - JobName='MySentimentDetectionJob_' + str(int(time.time())), + JobName='MySentimentDetectionJob_' + article_id + '_' + str(int(time.time())) ) res = comprehend.describe_sentiment_detection_job(JobId=sentiment_job['JobId']) sentiment_output = res['SentimentDetectionJobProperties']['OutputDataConfig']['S3Uri'] @@ -77,28 +88,17 @@ OutputDataConfig={'S3Uri': 's3://awstraindata/output/keyphrases/'}, DataAccessRoleArn=role_arn, LanguageCode='en', - JobName='MyKeyPhrasesDetectionJob_' + str(int(time.time())), + JobName='MyKeyPhrasesDetectionJob_' + article_id + '_' + str(int(time.time())) ) res = comprehend.describe_key_phrases_detection_job(JobId=phrases_job['JobId']) key_phrases_output = res['KeyPhrasesDetectionJobProperties']['OutputDataConfig']['S3Uri'] print("Entities Job Response:", entities_output) print("Sentiment Job Response:", sentiment_output) print("Key Phrases Job Response:", key_phrases_output) - cursor.execute(""" - drop table if exists comprehend_jobs - """) - cursor.execute(""" - CREATE TABLE IF NOT EXISTS comprehend_jobs ( - article_id TEXT, - input_s3_uri TEXT, - entities_path TEXT, - sentiment_path TEXT, - key_phrases_path TEXT - ) - """) + print("Inserting into comprehend_jobs table") cursor.execute(""" INSERT INTO comprehend_jobs (article_id, input_s3_uri, entities_path, sentiment_path, key_phrases_path) - VALUES (%s, %s, %s, %s, %s)""", (articles_id, s3_uri, entities_output.replace('s3://awstraindata/', ''), sentiment_output.replace('s3://awstraindata/', ''), key_phrases_output.replace('s3://awstraindata/', ''))) + VALUES (%s, %s, %s, %s, %s)""", (article_id, s3_uri, entities_output.replace('s3://awstraindata/', ''), sentiment_output.replace('s3://awstraindata/', ''), key_phrases_output.replace('s3://awstraindata/', ''))) conn.commit() cursor.close() conn.close() \ No newline at end of file diff --git a/backend/output_handler/output_handler.py b/backend/output_handler/output_handler.py new file mode 100644 index 0000000..8669089 --- /dev/null +++ b/backend/output_handler/output_handler.py @@ -0,0 +1,148 @@ +import boto3 +import tarfile +import json +import io +from Utils import get_postgresql_connection +import datetime + + + +def lambda_handler(event, context): + try: + for record in event['Records']: + print(f"New record: {record}") + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + print(f"Processing file from bucket: {bucket}, key: {key}") + conn = get_postgresql_connection() + s3 = boto3.client('s3') + print(f"Connecting to S3 bucket: {bucket}") + obj = s3.get_object(Bucket=bucket, Key=key) + print(f"Downloaded object from S3: {key}") + tar_bytes = io.BytesIO(obj['Body'].read()) + print(f"Processing file: {key}") + # Extract .json inside the tar.gz + with tarfile.open(fileobj=tar_bytes, mode='r:gz') as tar: + print(f"Extracting files from tar: {key}") + for member in tar.getmembers(): + print(f"Found member: {member.name}") + if member.name == "output" and member.isfile(): + print(f"Extracting JSON file: {member.name}") + file = tar.extractfile(member) + if not file: + print(f"File {member.name} not found in tar.") + continue + result = json.load(file) + print(f"Extracted JSON: {result}") + break + print(f"Results: {result}") + if result: + print(f"Results found in the file: {key}") + folderSplit = key.split('/') + type = folderSplit[1] + cursor = conn.cursor() + query = "SELECT * FROM comprehend_jobs WHERE entities_path = %s or sentiment_path = %s or key_phrases_path = %s" + cursor.execute(query, (key, key, key)) + row = cursor.fetchone() + print(f"Row found: {row}") + print(f"Type of analysis: {type}") + if row: + article_id = row[0] + print(f"Article ID: {article_id}") + if type == 'entities': + entity_array = result['Entities'] + if entity_array: + ## get the entities from the entities table + add_entities_to_article(conn, cursor, article_id, entity_array) + elif type == 'keyphrases': + keyPhrases_array = result['KeyPhrases'] + if keyPhrases_array: + for keyPhrase in keyPhrases_array: + keyPhrase['Type'] = 'KeyPhrase' + add_entities_to_article(conn, cursor, article_id, keyPhrases_array) + elif type == 'sentiment': + sentiment = result.get('Sentiment', 'NEUTRAL') + if sentiment: + cursor.execute("""update articles set sentiment = %s where article_id = %s""", (sentiment, article_id)) + conn.commit() + cursor.close() + ## delete the s3 object + # s3.delete_object(Bucket=bucket, Key=result['input_s3_uri']) + conn.close() + except Exception as e: + print(f"Error processing record: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + +def add_entities_to_article(conn, cursor, article_id, entities): + entities_text = [entity['Text'] for entity in entities] + print(f"Entities to be added: {entities_text}") + cursor.execute("SELECT * FROM entities WHERE entity in %s", (tuple(entities_text),)) + entity_db_array = cursor.fetchall() + print(f"Entities in DB: {entity_db_array}") + location_mentions = [] + officials_involved = [] + relevance_category = [] + print(f"article_id: {article_id}") + + print(f"Relevance category: {relevance_category}") + for entity in entities: + print(f"Processing entity: {entity}") + entity_in_db = [db_entity for db_entity in entity_db_array if db_entity[3].lower() == entity['Text'].lower()] + print(f"Entity in DB: {entity_in_db}") + if not entity_in_db: + current_time = datetime.datetime.utcnow() + cursor.execute("INSERT INTO entities (create_time,entity,type) VALUES (%s, %s, %s) RETURNING id", (current_time, entity['Text'], entity['Type'])) + conn.commit() + db_entity = cursor.fetchone() + print(f"Inserted new entity: {db_entity}") + if entity['Type'] == 'LOCATION': + location_mentions.append(db_entity[0]) + elif entity['Type'] == 'PERSON': + officials_involved.append(db_entity[0]) + else: + relevance_category.append(db_entity[0]) + else: + print(f"Entity already exists in DB: {entity_in_db}") + if entity['Type'] == 'LOCATION': + location_mentions.append(entity_in_db[0][0]) + elif entity['Type'] == 'PERSON': + officials_involved.append(entity_in_db[0][0]) + else: + relevance_category.append(entity_in_db[0][0]) + if location_mentions: + location_mentions = ','.join(map(str, location_mentions)) + cursor.execute("""update articles set location_mentions = %s where article_id = %s""", (location_mentions, article_id)) + + if officials_involved: + officials_involved = ','.join(map(str, officials_involved)) + cursor.execute("""update articles set officials_involved = %s where article_id = %s""", (officials_involved, article_id)) + + if relevance_category: + cursor.execute("SELECT relevance_category FROM articles WHERE article_id = %s", (article_id,)) + existing = cursor.fetchone() + relevance_category = ','.join(map(str, relevance_category)) + if existing[0] is not None: + print(f"Existing relevance category: {existing[0]}") + relevance_category = relevance_category + ',' + existing[0] + cursor.execute("""update articles set relevance_category = %s where article_id = %s""", (relevance_category, article_id)) + + +# events = [ +# { +# "s3": { +# "bucket": { +# "name": "awstraindata" +# }, +# "object": { +# "key": "output/entities/269854564686-NER-7b5218ec8e556761890504a59e10da02/output/output.tar.gz" +# } +# } +# } +# ] +# obj= { +# "Records": events +# } +# lambda_handler(obj, None) \ No newline at end of file diff --git a/backend/pg_config.json b/backend/pg_config.json index 596e176..64ca955 100644 --- a/backend/pg_config.json +++ b/backend/pg_config.json @@ -1,7 +1,7 @@ { - "host": "ap-ai-hackathon.cluster-cqt08oi8i1b6.us-east-1.rds.amazonaws.com", + "host": "hackathon-ai-ap.cluster-cqt08oi8i1b6.us-east-1.rds.amazonaws.com", "database": "postgres", "user": "postgres", - "password": "AIHackathon", + "password": "3D6[~771pd5|pkF03dBeL.5#IZ5?", "port": 5432 } \ No newline at end of file diff --git a/backend/raw_data_handler/raw_data_handler.py b/backend/raw_data_handler/raw_data_handler.py new file mode 100644 index 0000000..9c13cb3 --- /dev/null +++ b/backend/raw_data_handler/raw_data_handler.py @@ -0,0 +1,150 @@ +import base64 +import datetime +import json +import time +import uuid +from Utils import get_postgresql_connection +from fastapi import FastAPI +from docx import Document +import csv +import io +import re +import boto3 +import traceback +from test_agent import is_relevance + +BUCKET_NAME = 'awstraindata' +role = 'arn:aws:iam::269854564686:role/hackathon-comprehend-role' +def lambda_handler(event, context): + try: + conn = get_postgresql_connection() + cursor = conn.cursor() + comprehend = boto3.client('comprehend', region_name='us-east-1') + for record in event['Records']: + print(f"New record: {record}") + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + print(f"Processing file from bucket: {bucket}, key: {key}") + s3 = boto3.client('s3') + print(f"Connecting to S3 bucket: {bucket}") + obj = s3.get_object(Bucket=bucket, Key=key) + stream = io.BytesIO(obj['Body'].read()) + articles = extract_articles(stream) + print(f"Extracted {len(articles)} articles from part") + for article in articles: + print(f"Processing article: {article['Title']}") + # Check if article is relevant + is_relevant = is_relevance(article) + if not is_relevant: + print(f"Article {article['Title']} is not relevant, skipping") + continue + output_csv = io.StringIO() + writer = csv.DictWriter(output_csv, fieldnames=["Title", "Source", "Date", "Content"]) + # writer.writeheader() + writer.writerow(article) + article_id = str(uuid.uuid4()) + # Generate unique filename + csv_filename = f"input/articles-{article_id}.csv" + cursor.execute(""" + INSERT INTO articles (article_id, title, body, source, published_date) + VALUES (%s, %s, %s, %s, %s)""", (article_id, article['Title'], article['Content'], article['Source'], article['Date'])) + # Upload to S3 + print(f"Uploading CSV to S3: {csv_filename}") + conn.commit() + get_data_inline(output_csv.getvalue(), article_id, article['Date'], comprehend, cursor, conn) + cursor.close() + conn.close() + lambda_client = boto3.client('lambda') + response = lambda_client.invoke( + FunctionName='clustering_service', + InvocationType='Event' + ) + print(f"Second Lambda function invoked: {response}") + except Exception as e: + traceback.print_exc() + print(f"Error processing event: {e}") + +def get_data_inline(data, articles_id, article_date, comprehend, cursor, conn): + print(f"Processing data for article ID: {articles_id}") + entities_response = comprehend.detect_entities( + Text=data, + # DataAccessRoleArn=role_arn, + LanguageCode='en' + ) + print(f"Entities detected: {entities_response['Entities']}") + add_entities_to_article(conn, cursor, articles_id, entities_response['Entities']) + response = comprehend.detect_key_phrases( + Text=data, + # DataAccessRoleArn=role_arn, + LanguageCode='en' + ) + print(f"Key phrases detected: {response['KeyPhrases']}") + for keyPhrase in response['KeyPhrases']: + keyPhrase['Type'] = 'KeyPhrase' + add_keyphrase_to_article(conn, cursor, articles_id, article_date, response['KeyPhrases']) + sentiment_response = comprehend.detect_sentiment( + Text=data, + # DataAccessRoleArn=role_arn, + LanguageCode='en' + ) + print(f"Sentiment detected: {sentiment_response['Sentiment']}") + sentiment = sentiment_response['Sentiment'] + if sentiment: + cursor.execute("""update articles set sentiment = %s where article_id = %s""", (sentiment, articles_id)) + +def extract_articles(file_stream): + print(f"Extracting articles from file stream") + doc = Document(file_stream) + print(f"Document loaded with {len(doc.paragraphs)} paragraphs") + text = "\n".join(p.text for p in doc.paragraphs) + pattern = re.compile( + r'Title:\s*(.*?)\s*Source:\s*(.*?)\s*Date:\s*(.*?)\s*(?=(?:\d{1,2}\)|Title:)|\Z)', + re.DOTALL + ) + matches = pattern.findall(text) + print(f"Found {len(matches)} matches in the document") + articles = [] + for match in matches: + print(f"Processing match: {match}") + title = match[0].strip() + source = match[1].strip() + date_parts = match[2].strip().split("\n", 1) + date = date_parts[0].strip() + content = date_parts[1].strip() if len(date_parts) > 1 else "" + print(f"Extracted article - Title: {title}, Source: {source}, Date: {date}, Content length: {len(content)}") + articles.append({ + "Title": title, + "Source": source, + "Date": date, + "Content": content + }) + return articles + + +def add_keyphrase_to_article(conn, cursor, article_id, article_date, entities): + entities_text = [entity['Text'] for entity in entities] + print(f"Entities to be added: {entities_text}") + print(f"article_id: {article_id}") + if entities_text: + relevance_category = ','.join(map(str, entities_text)) + cursor.execute("""update articles set relevance_category = %s where article_id = %s""", (relevance_category, article_id)) + +def add_entities_to_article(conn, cursor, article_id, entities): + entities_text = [entity['Text'] for entity in entities] + print(f"Entities to be added: {entities_text}") + location_mentions = [] + officials_involved = [] + print(f"article_id: {article_id}") + for entity in entities: + if entity['Type'] == 'LOCATION': + location_mentions.append(entity['Text'].lower()) + elif entity['Type'] == 'PERSON' or entity['Type'] == 'ORGANIZATION': + officials_involved.append(entity['Text'].lower()) + print(f"Processing entity: {entity}") + if location_mentions: + location_mentions = ','.join(map(str, location_mentions)) + cursor.execute("""update articles set location_mentions = %s where article_id = %s""", (location_mentions, article_id)) + + if officials_involved: + officials_involved = ','.join(map(str, officials_involved)) + cursor.execute("""update articles set officials_involved = %s where article_id = %s""", (officials_involved, article_id)) \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index e8f021d..0415b7b 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,5 +1,14 @@ -fastapi +numpy +pandas +boto3 +python-docx +fastapi[all] +uvicorn[standard] pydantic -pydantic-core -typing-extensions +python-dotenv +requests_toolbelt +sqlmodel +psycopg2-binary==2.9.9 mangum +# psycopg[binary] # The modern v3 driver +# psycopg-pool # The connection pool for v3 diff --git a/backend/test_agent.py b/backend/test_agent.py new file mode 100644 index 0000000..c09a98f --- /dev/null +++ b/backend/test_agent.py @@ -0,0 +1,54 @@ +import re +import traceback +import boto3 +import json + +def is_relevance(article): + is_relevant = True + try: + # Initialize Bedrock Agent Runtime client + bedrock_agent = boto3.client("bedrock-agent-runtime", region_name="us-east-1") # replace with your region + + # Agent identifiers (get these from Bedrock console) + agent_id = "IKXDLL0K7W" + agent_alias_id = "DY9KWQNAGM" + + # Your dynamic user message (e.g., relationship analysis prompt) + user_input = article['Content'] + + # Call the agent + response = bedrock_agent.invoke_agent( + agentId=agent_id, + agentAliasId=agent_alias_id, + sessionId="news-analysis-session-001", + inputText=user_input + ) + + # Read the response stream + + print("Response from Bedrock Agent:") + print(response) + for event in response["completion"]: + print("Event:", event) + if "chunk" in event: + print("Processing chunk...") + print("Chunk ID:", event["chunk"]) + payload = event["chunk"]["bytes"] + chunk_str = payload.decode("utf-8") + match = re.search(r"\{.*\}", chunk_str, re.DOTALL) + if match: + print("Found JSON block in chunk") + print("JSON Block:", match.group(0)) + json_block = match.group(0) + parsed_json = json.loads(json_block) + print("Parsed JSON:", parsed_json) + is_relevant = parsed_json.get("relevance_score", 0) > 0.5 + print("Is relevant:", is_relevant) + print(parsed_json.get("relevance_score", "No content found")) + else: + print("āŒ No JSON found in response") + except Exception as e: + print("Error processing response:", e) + pass + # traceback.print_exc() + return is_relevant diff --git a/backend/web_api/bedrock_agent.py b/backend/web_api/bedrock_agent.py new file mode 100644 index 0000000..8967dbf --- /dev/null +++ b/backend/web_api/bedrock_agent.py @@ -0,0 +1,89 @@ +# bedrock_agent.py +import boto3 +import json + +# It's a good practice to create the client once and reuse it. +# Ensure your AWS credentials are configured (e.g., via `aws configure`) +# and you have selected a region where the model is available. +bedrock_runtime = boto3.client( + service_name="bedrock-runtime", + region_name="us-east-1" # e.g., us-east-1 +) + +# Choose a model. Claude 3 Sonnet is a great choice for this task. +# You can also use "anthropic.claude-v2:1", "anthropic.claude-instant-v1", etc. +MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0" + +def generate_sql_from_prompt(user_question: str, table_schema: str) -> str: + """ + Uses Bedrock to generate a SQL query from a user's natural language question. + + Args: + user_question: The question from the user. + table_schema: The CREATE TABLE statement for the relevant table. + + Returns: + A SQL query string. + """ + + # This prompt engineering is the most critical part of the process. + # It gives the model context, instructions, and constraints. + prompt = f""" +Human: You are a PostgreSQL expert. Your task is to generate a SQL query based on a user's question. +You will be given the database schema and a question. +You MUST follow these rules: +1. ONLY generate a SQL `SELECT` query. Do not generate any other type of query (INSERT, UPDATE, DELETE, etc.). +2. Do not include any text, explanation, or markdown formatting before or after the SQL query. Your entire response must be only the SQL query itself. +3. The query should be for a PostgreSQL database. + +Here is the table schema: + +{table_schema} + + +Here is the user's question: + +{user_question} + + +Assistant: +""" + + # Prepare the payload for the Bedrock API + body = json.dumps({ + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 1000, + "temperature": 0.0, # Use 0 for deterministic, factual responses + "messages": [ + { + "role": "user", + "content": [{"type": "text", "text": prompt}], + } + ], + }) + + try: + # Invoke the model + response = bedrock_runtime.invoke_model( + body=body, + modelId=MODEL_ID, + accept='application/json', + contentType='application/json' + ) + + # Parse the response body + response_body = json.loads(response.get('body').read()) + + # Extract the generated text + generated_sql = response_body.get('content')[0].get('text') + + # Clean up the response (remove potential leading/trailing whitespace or markdown) + cleaned_sql = generated_sql.strip().replace("```sql", "").replace("```", "").strip() + + print(f"Bedrock generated SQL: {cleaned_sql}") + return cleaned_sql + + except Exception as e: + print(f"Error invoking Bedrock model: {e}") + # In a real app, you'd want more robust error handling + raise \ No newline at end of file diff --git a/backend/web_api/bedrock_agent_invoke.py b/backend/web_api/bedrock_agent_invoke.py new file mode 100644 index 0000000..7867aa1 --- /dev/null +++ b/backend/web_api/bedrock_agent_invoke.py @@ -0,0 +1,157 @@ +# bedrock_agent_invoke.py +import boto3 +import json +from typing import Optional + +# Use the 'bedrock-agent-runtime' client for invoking agents +bedrock_agent_runtime = boto3.client( + service_name="bedrock-agent-runtime", + region_name="us-east-1" # Use the region where your agent is deployed +) + +def invoke_bedrock_agent_to_get_sql( + question: str, + agent_id: str, + agent_alias_id: str, + session_id: str +) -> Optional[str]: + """ + Invokes a pre-configured Bedrock Agent and extracts the generated SQL query + from its response trace. + + Args: + question: The user's natural language question. + agent_id: The ID of your Bedrock Agent. + agent_alias_id: The alias ID for the agent version you want to use. + session_id: A unique identifier for the conversation session. + + Returns: + The generated SQL query string, or None if not found. + """ + prompt = f""" + convert the natural language query into a pgsql query. Return ONLY the sql command as an answer. + Here is the table schema: + + CREATE TABLE articles( + articles_id text, + title text, + body text, + "source" text, + published_date text, + entities text, + sentiment text + ); + + Here is the user's question: + + {question} + + while creating query ensure that it can ignore case sensitive. + Give me the stream response as json in an API call + + """ + try: + # The invoke_agent API returns a streaming response. + response = bedrock_agent_runtime.invoke_agent( + agentId=agent_id, + agentAliasId=agent_alias_id, + sessionId=session_id, + inputText=prompt, + streamingConfigurations = { + "applyGuardrailInterval" : 20, + "streamFinalResponse" : False + } + ) + + event_stream = response['completion'] + final_sql_query = None + all_events = [] + + # Collect all events for post-processing + # for event in event_stream: + # all_events.append(event) + # if 'trace' in event: + # trace_part = event['trace']['trace'] + # if 'observation' in trace_part: + # observation = trace_part['observation'] + # # If observation is a list, iterate through it + # if isinstance(observation, list): + # for obs in observation: + # if 'finalResponse' in obs: + # final_response = obs['finalResponse'] + # text = final_response.get('text', '') + # # Extract SQL from markdown code block if present + # if '```sql' in text: + # sql_start = text.find('```sql') + len('```sql') + # sql_end = text.find('```', sql_start) + # final_sql_query = text[sql_start:sql_end].strip() + # else: + # final_sql_query = text.strip() + # print(f"Extracted SQL from finalResponse: {final_sql_query}") + # break + # # If observation is a dict, handle as before + # elif isinstance(observation, dict): + # if 'finalResponse' in observation: + # final_response = observation['finalResponse'] + # text = final_response.get('text', '') + # if '```sql' in text: + # sql_start = text.find('```sql') + len('```sql') + # sql_end = text.find('```', sql_start) + # final_sql_query = text[sql_start:sql_end].strip() + # else: + # final_sql_query = text.strip() + # print(f"Extracted SQL from finalResponse: {final_sql_query}") + # break + # if 'actionGroupInvocationOutput' in observation: + # output_str = observation['actionGroupInvocationOutput']['text'] + # try: + # output_json = json.loads(output_str) + # if 'generatedQuery' in output_json: + # final_sql_query = output_json['generatedQuery'] + # print(f"Extracted SQL from Agent trace: {final_sql_query}") + # break + # except json.JSONDecodeError: + # print(f"Could not decode observation output: {output_str}") + + # Fallback: check for chunk events if not found + for event in all_events: + if 'chunk' in event: + raw_bytes = event['chunk']['bytes'] + print(f"Raw chunk bytes: {raw_bytes!r}") + if raw_bytes: + try: + # Try to decode as JSON, but if it fails, treat as plain text + try: + data = json.loads(raw_bytes.decode()) + if data.get('type') == 'finalResponse': + text = data.get('text', '') + if '```sql' in text: + sql_start = text.find('```sql') + len('```sql') + sql_end = text.find('```', sql_start) + final_sql_query = text[sql_start:sql_end].strip() + else: + final_sql_query = text.strip() + print(f"Extracted SQL from chunk finalResponse: {final_sql_query}") + break + except json.JSONDecodeError: + # Not JSON, treat as plain text + text = raw_bytes.decode() + if '```sql' in text: + sql_start = text.find('```sql') + len('```sql') + sql_end = text.find('```', sql_start) + final_sql_query = text[sql_start:sql_end].strip() + else: + final_sql_query = text.strip() + print(f"Extracted SQL from plain text chunk: {final_sql_query}") + break + except Exception as e: + print(f"Error decoding chunk: {e}") + else: + print("Chunk bytes are empty, skipping.") + # if not final_sql_query: + + return final_sql_query + + except Exception as e: + print(f"Error invoking Bedrock Agent: {e}") + raise \ No newline at end of file diff --git a/backend/web_api/database.py b/backend/web_api/database.py new file mode 100644 index 0000000..4f7b9fa --- /dev/null +++ b/backend/web_api/database.py @@ -0,0 +1,23 @@ +# database.py +import os +from sqlmodel import create_engine, SQLModel, Session + +# Use a real database URL in production +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql://your_user:your_password@your_aurora_endpoint/myappdb" +).replace("postgresql://", "postgresql+psycopg2://") + +# The 'connect_args' is needed for SQLite, but not for PostgreSQL. +# For PostgreSQL, you can remove it. +engine = create_engine(DATABASE_URL, echo=True) + +# def create_db_and_tables(): +# # This function creates all tables defined by SQLModel models +# # that are subclasses of SQLModel. It's good to run this once at startup. +# SQLModel.metadata.create_all(engine) + +# Dependency function to get a database session +def get_session(): + with Session(engine) as session: + yield session \ No newline at end of file diff --git a/backend/web_api/models.py b/backend/web_api/models.py new file mode 100644 index 0000000..206906f --- /dev/null +++ b/backend/web_api/models.py @@ -0,0 +1,57 @@ +# models.py +from typing import Optional +from sqlmodel import Field, SQLModel +import datetime + +class Articles(SQLModel, table=True): + article_id: str = Field(primary_key=True) + title: Optional[str] = None + body: Optional[str] = None + source: Optional[str] = Field(default=None, alias="source") + published_date: Optional[str] = None + location_mentions: Optional[str] = None + officials_involved: Optional[str] = None + relevance_category: Optional[str] = None + sentiment: Optional[str] = None + +class ArticleCreate(SQLModel): + title: Optional[str] = None + body: Optional[str] = None + source: Optional[str] = None + published_date: Optional[str] = None + location_mentions: Optional[str] = None + officials_involved: Optional[str] = None + relevance_category: Optional[str] = None + sentiment: Optional[str] = None + +class ArticleRead(SQLModel): + article_id: str + title: Optional[str] = None + body: Optional[str] = None + source: Optional[str] = None + published_date: Optional[str] = None + location_mentions: Optional[str] = None + officials_involved: Optional[str] = None + relevance_category: Optional[str] = None + sentiment: Optional[str] = None + +class Clusters(SQLModel, table=True): + id: int = Field(default=None, primary_key=True) + title: Optional[datetime.date] = None + linkedarticles: Optional[str] = None + startdate: Optional[str] = None + enddate: Optional[str] = None + +class ClusterCreate(SQLModel): + title: Optional[datetime.date] = None + linkedarticles: Optional[str] = None + startdate: Optional[str] = None + enddate: Optional[str] = None + +class ClusterRead(SQLModel): + id: int + title: Optional[datetime.date] = None + linkedarticles: Optional[str] = None + startdate: Optional[str] = None + enddate: Optional[str] = None + diff --git a/backend/web_api/web_api.py b/backend/web_api/web_api.py new file mode 100644 index 0000000..4fcab05 --- /dev/null +++ b/backend/web_api/web_api.py @@ -0,0 +1,186 @@ +# main.py +import os +from typing import List +import uuid +from fastapi import FastAPI, Depends, HTTPException, Response +from sqlmodel import Session, select + +from database import get_session +from models import Articles, ArticleCreate, ArticleRead, Clusters, ClusterCreate, ClusterRead +from typing import List, Dict, Any + +from contextlib import asynccontextmanager, closing +# from typing import List, Dict, Any + +# from fastapi import FastAPI, HTTPException, Depends +from pydantic import BaseModel, Field +from psycopg2 import ProgrammingError +import psycopg2 +from contextlib import closing +from mangum import Mangum + +# # Import our new Bedrock agent function +# from bedrock_agent import generate_sql_from_prompt +# namasthe +# Import our new agent invoker function +from bedrock_agent_invoke import invoke_bedrock_agent_to_get_sql + +# Load environment variables from .env file +from dotenv import load_dotenv +load_dotenv() + +# --- Agent Configuration --- +# Load from environment variables for security and flexibility +AGENT_ID = os.environ.get("BEDROCK_AGENT_ID") +AGENT_ALIAS_ID = os.environ.get("BEDROCK_AGENT_ALIAS_ID", "TSTALIASID") # TSTALIASID is a common default + +app = FastAPI( + title="FastAPI with Bedrock Agents", + redirect_slashes=True, +) + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("Application startup...") + yield + print("Application shutdown...") + # pool.close() + + +# This event handler runs once when the application starts. +# @app.on_event("startup") +# def on_startup(): +# create_db_and_tables() + +@app.post("/articles/", response_model=ArticleRead) +def create_article(hero: ArticleCreate, session: Session = Depends(get_session)): + db_article = Articles.model_validate(hero) + session.add(db_article) + session.commit() + session.refresh(db_article) + return db_article + +@app.get("/articles/", response_model=List[ArticleRead]) +def read_articles(skip: int = 0, limit: int = 100, session: Session = Depends(get_session)): + heroes = session.exec(select(Articles).offset(skip).limit(limit)).all() + return heroes + +@app.get("/articles/{hero_id}", response_model=ArticleRead) +def read_article(hero_id: int, session: Session = Depends(get_session)): + article = session.get(Articles, hero_id) + if not article: + raise HTTPException(status_code=404, detail="Article not found") + return article + +@app.post("/clusters/", response_model=ClusterRead) +def create_cluster(cluster: ClusterCreate, session: Session = Depends(get_session)): + db_cluster = Clusters.model_validate(cluster) + session.add(db_cluster) + session.commit() + session.refresh(db_cluster) + return db_cluster + +@app.get("/clusters/", response_model=List[ClusterRead]) +def read_clusters(skip: int = 0, limit: int = 100, session: Session = Depends(get_session)): + clusters = session.exec(select(Clusters).offset(skip).limit(limit)).all() + return clusters + +@app.get("/clusters/{cluster_id}", response_model=ClusterRead) +def read_cluster(cluster_id: str, session: Session = Depends(get_session)): + cluster = session.get(Clusters, cluster_id) + if not cluster: + raise HTTPException(status_code=404, detail="Cluster not found") + return cluster + +@app.get("/groupedClusters/") +def grouped_clusters(session: Session = Depends(get_session)): + clusters = session.exec(select(Clusters)).all() + articles = session.exec(select(Articles)).all() + # Build a mapping from cluster id to articles + cluster_map = {cluster.id: [] for cluster in clusters} + for article in articles: + # Assuming 'linkedarticles' in Clusters is a comma-separated list of article ids + for cluster in clusters: + if cluster.linkedarticles: + linked_ids = [x.strip() for x in cluster.linkedarticles.split(",") if x.strip()] + if article.articles_id in linked_ids: + cluster_map[cluster.id].append(article) + # Build the response + result = [] + for cluster in clusters: + cluster_dict = cluster.dict() + cluster_dict["articles"] = cluster_map[cluster.id] + result.append(cluster_dict) + return result + + +# --- Pydantic Models --- +class NaturalLanguageQuery(BaseModel): + question: str = Field(..., example="How many heroes are there?") + session_id: str | None = Field(default=None, description="Conversation session ID. A new one is generated if not provided.") + +# --- Database Connection --- +# IMPORTANT: Use a read-only user for the database connection. +DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://your_user:your_password@your_aurora_endpoint/myappdb") + +def get_db_connection(): + conn = psycopg2.connect(DATABASE_URL) + try: + yield conn + finally: + conn.close() + +@app.post("/queryagent", response_model=List[Dict[str, Any]]) +def query_with_bedrock_agent(query: NaturalLanguageQuery, conn=Depends(get_db_connection)): + """ + Takes a natural language question, sends it to a pre-configured Bedrock Agent, + executes the returned SQL, and returns the results. + """ + session_id = query.session_id or str(uuid.uuid4()) + print(f"Invoking agent for question: '{query.question}' with session_id: {session_id}") + + # 1. Invoke the agent to get the SQL query + try: + generated_sql = invoke_bedrock_agent_to_get_sql( + question=query.question, + agent_id=AGENT_ID, + agent_alias_id=AGENT_ALIAS_ID, + session_id=session_id + ) + print("Generated SQL from agent:", generated_sql) # Debug print + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to invoke Bedrock Agent: {e}") + + if not generated_sql: + raise HTTPException(status_code=404, detail="Agent did not return a SQL query.") + + # 2. *** CRITICAL SECURITY CHECK *** + if not generated_sql.strip().upper().startswith("SELECT"): + raise HTTPException( + status_code=400, + detail="Agent returned a non-SELECT query. Execution aborted." + ) + + # 3. Execute the SQL from the agent + try: + with conn.cursor() as cur: + cur.execute(generated_sql) + if cur.description is None: + return [] + + column_names = [desc[0] for desc in cur.description] + results = cur.fetchall() + return [dict(zip(column_names, row)) for row in results] + + except ProgrammingError as e: + raise HTTPException(status_code=400, detail=f"Invalid SQL Query from Agent: {e}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Database execution error: {e}") + +handler = Mangum(app) + +def lambda_handler(event, context): + """ + AWS Lambda handler for FastAPI app using Mangum adapter. + """ + return handler(event, context)