From f6af1ebf69aac3d6b662c032ed4456a781b605d1 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Thu, 23 Oct 2025 21:24:12 +0100 Subject: [PATCH 01/23] if state DONE then return NoContent added --- Parser/Parser/Api/ApplyTransformation.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index dddefed..8a625c0 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -50,6 +50,11 @@ public async Task Apply([FromBody] EventArcStoragePayload body) if (!snap.Exists) return NotFound($"Job {jobId} not found."); + var state = snap.TryGetValue("state", out var s) ? s : null; + + if(state == "DONE") + return NoContent(); + var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; var bucket = snap.TryGetValue("bucket", out var b) ? b : null; var prompt = snap.TryGetValue("prompt", out var pr) ? pr : null; From 3892751fffe9e94be2c3e78a3f551849591c1bff Mon Sep 17 00:00:00 2001 From: ameysunu Date: Fri, 24 Oct 2025 23:57:47 +0100 Subject: [PATCH 02/23] try catch and google auth added for url signing --- Parser/Parser/Api/ApplyTransformation.cs | 172 +++++++++++++---------- 1 file changed, 98 insertions(+), 74 deletions(-) diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index 8a625c0..0a5e710 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -21,17 +21,20 @@ public class ApplyTransformation : ControllerBase private readonly CollectionReference _collection; private readonly ParsingConfig _config; private readonly ITransformationService _service; + private readonly IGoogleAuth _googleAuth; public ApplyTransformation( ILogger logger, StorageClient storageClient, ITransformationService service, + IGoogleAuth googleAuth, IOptions config) { _logger = logger; _storage = storageClient; _config = config.Value; _service = service; + _googleAuth = googleAuth; var db = FirestoreDb.Create(_config.GcpProjectId); _collection = db.Collection(_config.FirestoreCollection); @@ -49,89 +52,110 @@ public async Task Apply([FromBody] EventArcStoragePayload body) var snap = await jobRef.GetSnapshotAsync(); if (!snap.Exists) return NotFound($"Job {jobId} not found."); - - var state = snap.TryGetValue("state", out var s) ? s : null; - - if(state == "DONE") - return NoContent(); - - var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; - var bucket = snap.TryGetValue("bucket", out var b) ? b : null; - var prompt = snap.TryGetValue("prompt", out var pr) ? pr : null; - - if (string.IsNullOrWhiteSpace(parsedPath)) - return BadRequest("Job missing parsedPath."); - if (string.IsNullOrWhiteSpace(bucket)) - bucket = _config.OutputBucket; - - await jobRef.SetAsync(new Dictionary { - ["state"] = "APPLYING GEMINI TRANSFORMATION", - ["updatedAt"] = DateTime.UtcNow, - ["transformPrompt"] = prompt, - }, SetOptions.MergeAll); - - await using var inStream = new MemoryStream(); - await _storage.DownloadObjectAsync(bucket, parsedPath, inStream); - inStream.Position = 0; - using var reader = new StreamReader(inStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: false); - - var outObj = $"jobs/{jobId}/transformed/data.jsonl"; - await using var outMem = new MemoryStream(); - await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); - - int linesPerChunk = Math.Clamp(500, 50, 2000); - var buffer = new List(linesPerChunk); - var totalIn = 0; - var totalOut = 0; - - string? line; - while ((line = await reader.ReadLineAsync()) is not null) + + try { - if (string.IsNullOrWhiteSpace(line)) continue; - buffer.Add(line); - if (buffer.Count >= linesPerChunk) + + var state = snap.TryGetValue("state", out var s) ? s : null; + + if (state == "DONE") + return NoContent(); + + var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; + var bucket = snap.TryGetValue("bucket", out var b) ? b : null; + var prompt = snap.TryGetValue("prompt", out var pr) ? pr : null; + + if (string.IsNullOrWhiteSpace(parsedPath)) + return BadRequest("Job missing parsedPath."); + if (string.IsNullOrWhiteSpace(bucket)) + bucket = _config.OutputBucket; + + await jobRef.SetAsync(new Dictionary + { + ["state"] = "APPLYING GEMINI TRANSFORMATION", + ["updatedAt"] = DateTime.UtcNow, + ["transformPrompt"] = prompt, + }, SetOptions.MergeAll); + + await using var inStream = new MemoryStream(); + await _storage.DownloadObjectAsync(bucket, parsedPath, inStream); + inStream.Position = 0; + using var reader = new StreamReader(inStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: true, + bufferSize: 1024, leaveOpen: false); + + var outObj = $"jobs/{jobId}/transformed/data.jsonl"; + await using var outMem = new MemoryStream(); + await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); + + int linesPerChunk = Math.Clamp(500, 50, 2000); + var buffer = new List(linesPerChunk); + var totalIn = 0; + var totalOut = 0; + + string? line; + while ((line = await reader.ReadLineAsync()) is not null) + { + if (string.IsNullOrWhiteSpace(line)) continue; + buffer.Add(line); + if (buffer.Count >= linesPerChunk) + { + var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); + var written = await _service.WriteJsonlAsync(outWriter, transformed); + totalIn += buffer.Count; + totalOut += written; + buffer.Clear(); + } + } + + + if (buffer.Count > 0) { var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); var written = await _service.WriteJsonlAsync(outWriter, transformed); - totalIn += buffer.Count; totalOut += written; - buffer.Clear(); + totalIn += buffer.Count; + totalOut += written; } - } + await outWriter.FlushAsync(); + outMem.Position = 0; + await _storage.UploadObjectAsync(_config.OutputBucket, outObj, "application/json", outMem); - if (buffer.Count > 0) - { - var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); - var written = await _service.WriteJsonlAsync(outWriter, transformed); - totalIn += buffer.Count; totalOut += written; - } - - await outWriter.FlushAsync(); - outMem.Position = 0; - await _storage.UploadObjectAsync(_config.OutputBucket, outObj, "application/json", outMem); - - var credential = await GoogleCredential.GetApplicationDefaultAsync(); - var urlSigner = UrlSigner.FromCredential(credential); - var downloadUrl = await urlSigner.SignAsync( - bucket: $"gs://{_config.OutputBucket}/{outObj}", - objectName: outObj, - duration: TimeSpan.FromDays(7), - httpMethod: HttpMethod.Get); - - await jobRef.SetAsync(new Dictionary { - ["state"] = "DONE", - ["updatedAt"] = DateTime.UtcNow, - ["result"] = new Dictionary { - ["transformedJsonl"] = downloadUrl, - ["inputLines"] = totalIn, - ["outputLines"] = totalOut - } - }, SetOptions.MergeAll); + var credential = await _googleAuth.GetAsync(); + var urlSigner = UrlSigner.FromCredential(credential); + var downloadUrl = await urlSigner.SignAsync( + bucket: _config.OutputBucket, + objectName: outObj, + duration: TimeSpan.FromDays(7), + httpMethod: HttpMethod.Get); - _logger.LogInformation("Transform complete for {Job}. In:{In} Out:{Out} -> gs://{Bucket}/{Obj}", - jobId, totalIn, totalOut, _config.OutputBucket, outObj); + await jobRef.SetAsync(new Dictionary + { + ["state"] = "DONE", + ["updatedAt"] = DateTime.UtcNow, + ["result"] = new Dictionary + { + ["transformedJsonl"] = downloadUrl, + ["inputLines"] = totalIn, + ["outputLines"] = totalOut + } + }, SetOptions.MergeAll); + + _logger.LogInformation("Transform complete for {Job}. In:{In} Out:{Out} -> gs://{Bucket}/{Obj}", + jobId, totalIn, totalOut, _config.OutputBucket, outObj); - return NoContent(); + return NoContent(); + } + catch (Exception e) + { + _logger.LogError("Exception during transformation for {Job}: {Error}", jobId, e.Message); + await jobRef.SetAsync(new Dictionary + { + ["state"] = "FAILED", + ["updatedAt"] = DateTime.UtcNow, + }, SetOptions.MergeAll); + + return BadRequest(); + } } } From 2165df96c5afff0773484f9a489493e03a26a454 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sat, 25 Oct 2025 13:21:32 +0100 Subject: [PATCH 03/23] get job state by user id added --- Api/Abstractions/IFirestoreService.cs | 2 ++ Api/Api/JobLists.cs | 44 +++++++++++++++++++++++++++ Api/Models/JobsDocument.cs | 23 ++++++++++++++ Api/Services/FirestoreService.cs | 9 ++++++ 4 files changed, 78 insertions(+) create mode 100644 Api/Api/JobLists.cs create mode 100644 Api/Models/JobsDocument.cs diff --git a/Api/Abstractions/IFirestoreService.cs b/Api/Abstractions/IFirestoreService.cs index 1bd7e35..593d295 100644 --- a/Api/Abstractions/IFirestoreService.cs +++ b/Api/Abstractions/IFirestoreService.cs @@ -1,5 +1,6 @@ using System.Text.Json.Nodes; using Api.Models; +using Google.Cloud.Firestore; namespace Api.Abstractions; @@ -8,4 +9,5 @@ public interface IFirestoreService Task CreateInitAsync(string jobId, string userId, string rawPath, JsonObject? targetSchema, string? prompt, JsonObject? options); Task?> GetAsync(string jobId); Task UpdateStateAsync(string jobId, JobState state, Dictionary? extra = null); + Task> GetDocumentsByField(string fieldName, string fieldValue); } \ No newline at end of file diff --git a/Api/Api/JobLists.cs b/Api/Api/JobLists.cs new file mode 100644 index 0000000..0467a27 --- /dev/null +++ b/Api/Api/JobLists.cs @@ -0,0 +1,44 @@ +using Api.Abstractions; +using Api.Services; +using Google.Cloud.Firestore; +using Microsoft.AspNetCore.Mvc; + +namespace Api.Api; + +[ApiController] +[Route("api/jobs")] +public class JobLists: ControllerBase +{ + private readonly ILogger _logger; + private readonly IFirestoreService _firestoreService; + + public JobLists( + ILogger logger, + IFirestoreService firestoreService + ) + { + _logger = logger; + _firestoreService = firestoreService; + } + + [HttpGet] + public async Task GetJobByUserId([FromQuery] string userId) + { + if (string.IsNullOrWhiteSpace(userId)) + return Ok(Array.Empty()); + + List jobList = new(); + var documents = await _firestoreService.GetDocumentsByField("userId", userId); + + if (documents is null || documents.Count == 0) + return Ok(jobList); + + foreach (var doc in documents) + { + var job = doc.ConvertTo(); + jobList.Add(job); + } + + return Ok(jobList); + } +} \ No newline at end of file diff --git a/Api/Models/JobsDocument.cs b/Api/Models/JobsDocument.cs new file mode 100644 index 0000000..b050eba --- /dev/null +++ b/Api/Models/JobsDocument.cs @@ -0,0 +1,23 @@ +using Google.Cloud.Firestore; + +[FirestoreData] +public class JobsDocument +{ + [FirestoreProperty("jobId")] + public string JobId { get; set; } + + [FirestoreProperty("userId")] + public string UserId { get; set; } + + [FirestoreProperty("state")] + public string State { get; set; } + + [FirestoreProperty("createdAt")] + public DateTime CreatedAt { get; set; } + + [FirestoreProperty("updatedAt")] + public DateTime UpdatedAt { get; set; } + + [FirestoreProperty("result")] + public object Result { get; set; } +} \ No newline at end of file diff --git a/Api/Services/FirestoreService.cs b/Api/Services/FirestoreService.cs index bf7d327..436b0ad 100644 --- a/Api/Services/FirestoreService.cs +++ b/Api/Services/FirestoreService.cs @@ -49,6 +49,15 @@ public async Task CreateInitAsync( string jobId, return snap.Exists ? snap.ToDictionary() : null; } + public async Task> GetDocumentsByField(string fieldName, string fieldValue) + { + var querySnap = await _collection.WhereEqualTo(fieldName, fieldValue).GetSnapshotAsync(); + if(querySnap == null || querySnap.Documents.Count == 0) + return null; + return querySnap.Documents; + + } + public async Task UpdateStateAsync(string jobId, JobState state, Dictionary? extra = null) { var updates = new Dictionary From c507d0efe9377719ec1db123329e30941b78d29f Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sat, 25 Oct 2025 13:22:10 +0100 Subject: [PATCH 04/23] ui changes --- datamorph-ui/src/App.scss | 219 +++++++++++++++++++++++ datamorph-ui/src/App.tsx | 15 +- datamorph-ui/src/components/Jobs.tsx | 181 +++++++++++++++++++ datamorph-ui/src/components/MainPage.tsx | 10 +- datamorph-ui/src/helpers/Api.ts | 23 ++- datamorph-ui/src/helpers/JobHelpers.ts | 14 ++ datamorph-ui/src/models/JobModel.tsx | 14 ++ 7 files changed, 470 insertions(+), 6 deletions(-) create mode 100644 datamorph-ui/src/components/Jobs.tsx create mode 100644 datamorph-ui/src/helpers/JobHelpers.ts create mode 100644 datamorph-ui/src/models/JobModel.tsx diff --git a/datamorph-ui/src/App.scss b/datamorph-ui/src/App.scss index 082e6a5..c931d40 100644 --- a/datamorph-ui/src/App.scss +++ b/datamorph-ui/src/App.scss @@ -449,6 +449,217 @@ body { } } +.shell--jobs { + text-align: left; + + .shell__heading { + text-align: center; + } +} + +.jobs { + display: grid; + gap: clamp(1.8rem, 3vw, 2.4rem); + text-align: left; + + &__header { + display: grid; + gap: 0.4rem; + + h2 { + margin: 0; + font-size: clamp(1.6rem, 3.6vw, 2rem); + font-weight: 600; + letter-spacing: -0.01em; + } + } + + &__subtitle { + margin: 0; + color: $text-soft; + font-size: 1rem; + } + + &__state { + padding: 1.2rem 1.6rem; + border-radius: 24px; + border: 1px solid rgba(255, 255, 255, 0.08); + background: rgba(18, 18, 18, 0.65); + box-shadow: inset 0 0 0 1px rgba(255, 255, 255, 0.05); + color: $text-muted; + text-align: center; + + &--error { + color: #ffb6b6; + border-color: rgba(255, 77, 77, 0.38); + background: rgba(48, 16, 16, 0.65); + box-shadow: inset 0 0 0 1px rgba(255, 77, 77, 0.22); + } + } + + &__grid { + display: grid; + gap: clamp(1.4rem, 2.8vw, 1.9rem); + grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); + } +} + +.job-card { + position: relative; + display: grid; + gap: 1.2rem; + padding: 1.6rem; + border-radius: 28px; + background: rgba(18, 18, 18, 0.78); + border: 1px solid rgba(255, 255, 255, 0.08); + box-shadow: + 0 24px 50px rgba(0, 0, 0, 0.45), + inset 0 0 0 1px rgba(255, 255, 255, 0.04); + backdrop-filter: blur(18px); + transition: transform 180ms ease, box-shadow 180ms ease, border-color 180ms ease; + + &:hover { + transform: translateY(-4px); + box-shadow: + 0 30px 60px rgba(0, 0, 0, 0.55), + inset 0 0 0 1px rgba(255, 255, 255, 0.08); + border-color: rgba(255, 255, 255, 0.12); + } + + &__header { + display: flex; + align-items: center; + justify-content: space-between; + gap: 0.75rem; + } + + &__title { + margin: 0; + font-size: 1.15rem; + font-weight: 600; + color: #fff; + } + + &__status { + display: inline-flex; + align-items: center; + justify-content: center; + padding: 0.35rem 0.9rem; + border-radius: 999px; + font-size: 0.78rem; + letter-spacing: 0.08em; + font-weight: 600; + text-transform: uppercase; + background: rgba(255, 255, 255, 0.16); + color: #fff; + box-shadow: inset 0 0 0 1px rgba(255, 255, 255, 0.14); + + &--done, + &--success, + &--completed { + background: rgba(88, 210, 144, 0.24); + color: #8cf5c4; + box-shadow: inset 0 0 0 1px rgba(88, 210, 144, 0.4); + } + + &--processing, + &--running, + &--in_progress { + background: rgba(255, 191, 72, 0.24); + color: #ffd992; + box-shadow: inset 0 0 0 1px rgba(255, 191, 72, 0.4); + } + + &--failed, + &--error, + &--cancelled { + background: rgba(255, 82, 82, 0.22); + color: #ff9c9c; + box-shadow: inset 0 0 0 1px rgba(255, 82, 82, 0.38); + } + } + + &__meta { + display: grid; + gap: 0.9rem; + + div { + display: grid; + gap: 0.25rem; + } + + dt { + font-size: 0.72rem; + text-transform: uppercase; + letter-spacing: 0.1em; + color: $text-soft; + } + + dd { + margin: 0; + font-size: 0.94rem; + color: rgba(255, 255, 255, 0.88); + word-break: break-all; + } + } + + &__stats { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 0.9rem; + padding: 1rem; + border-radius: 20px; + background: rgba(255, 255, 255, 0.05); + border: 1px solid rgba(255, 255, 255, 0.08); + } + + &__stat-value { + display: block; + font-size: 1.35rem; + font-weight: 600; + color: #fff; + } + + &__stat-label { + font-size: 0.72rem; + letter-spacing: 0.08em; + text-transform: uppercase; + color: $text-soft; + } + + &__footer { + display: flex; + justify-content: flex-end; + } + + &__download { + display: inline-flex; + align-items: center; + justify-content: center; + gap: 0.45rem; + padding: 0.65rem 1.2rem; + border-radius: 999px; + text-decoration: none; + font-weight: 600; + letter-spacing: 0.04em; + color: #f1f1f1; + background: linear-gradient(135deg, rgba(255, 255, 255, 0.18), rgba(255, 255, 255, 0.08)); + box-shadow: 0 14px 30px rgba(0, 0, 0, 0.45); + transition: transform 160ms ease, box-shadow 160ms ease, filter 160ms ease; + + &:hover { + transform: translateY(-2px); + filter: brightness(1.06); + box-shadow: 0 18px 38px rgba(0, 0, 0, 0.55); + } + + &:focus-visible { + outline: 2px solid rgba(255, 255, 255, 0.6); + outline-offset: 3px; + } + } +} + @media (max-width: 720px) { .shell { padding: 2.4rem 1.9rem; @@ -463,6 +674,14 @@ body { justify-content: center; } } + + .jobs { + gap: 1.6rem; + + &__grid { + grid-template-columns: 1fr; + } + } } @keyframes bob { diff --git a/datamorph-ui/src/App.tsx b/datamorph-ui/src/App.tsx index e4136a8..67d61db 100644 --- a/datamorph-ui/src/App.tsx +++ b/datamorph-ui/src/App.tsx @@ -1,11 +1,18 @@ -import './App.scss' -import { MainPage } from './components/MainPage' -import { AuthProvider } from './context/AuthContext' +import "./App.scss" +import { BrowserRouter, Routes, Route } from "react-router-dom" +import { MainPage } from "./components/MainPage" +import { Jobs } from "./components/Jobs" +import { AuthProvider } from "./context/AuthContext" function App() { return ( - + + + } /> + } /> + + ) } diff --git a/datamorph-ui/src/components/Jobs.tsx b/datamorph-ui/src/components/Jobs.tsx new file mode 100644 index 0000000..82e18b0 --- /dev/null +++ b/datamorph-ui/src/components/Jobs.tsx @@ -0,0 +1,181 @@ +import { useEffect, useMemo, useState } from "react" +import { useNavigate } from "react-router-dom" +import { useAuth } from "../context/AuthContext" +import { GetJobByUserId } from "../helpers/Api" +import type { Job } from "../models/JobModel" +import { formatDate } from "../helpers/JobHelpers" + + +export function Jobs() { + const navigate = useNavigate() + const { isAuthenticated, user, signOut } = useAuth() + const [jobs, setJobs] = useState([]) + const [isLoading, setIsLoading] = useState(true) + const [error, setError] = useState(null) + + useEffect(() => { + if (!isAuthenticated) { + navigate("/") + return + } + + const userId = localStorage.getItem("userId") ?? "" + if (!userId) { + setError("Missing user information. Please sign in again.") + setIsLoading(false) + return + } + + setIsLoading(true) + setError(null) + + GetJobByUserId(userId) + .then((jobsResponse) => { + setJobs(Array.isArray(jobsResponse) ? jobsResponse : []) + }) + .catch(() => { + setError("Unable to load your jobs right now. Please try again later.") + }) + .finally(() => setIsLoading(false)) + }, [isAuthenticated, navigate]) + + const orderedJobs = useMemo( + () => + [...jobs].sort((left, right) => { + return new Date(right.updatedAt).getTime() - new Date(left.updatedAt).getTime() + }), + [jobs] + ) + + if (!isAuthenticated) { + return null + } + + return ( +
+
+
+
+ {isAuthenticated ? `You're signed in as ${user?.name ?? ""}` : ""} +
+
+ + +
+
+ +
+

Job Runs

+
+ +
+
+

Recent jobs

+

Newest updates appear first.

+

All download links have a 7 day expiry. Please download your file in 7 days, from creation time.

+
+ + {isLoading && ( +
+

Loading your jobs…

+
+ )} + + {!isLoading && error && ( +
+

{error}

+
+ )} + + {!isLoading && !error && orderedJobs.length === 0 && ( +
+

No jobs yet. Submit a dataset to see it appear here.

+
+ )} + + {orderedJobs.length > 0 && ( +
+ {orderedJobs.map((job) => { + const jobResult = job.result ?? undefined + const statusModifier = (job.state || "unknown") + .toLowerCase() + .replace(/[^a-z0-9]+/g, "_") + + return ( +
+
+

Job Status

+ + {job.state} + +
+ +
+
+
Job ID
+
{job.jobId}
+
+
+
Created
+
{formatDate(job.createdAt)}
+
+
+
Last Updated
+
{formatDate(job.updatedAt)}
+
+
+ + {jobResult && ( +
+
+ {jobResult.inputLines ?? "—"} + Input Lines +
+
+ {jobResult.outputLines ?? "—"} + Output Lines +
+
+ )} + + {jobResult?.transformedJsonl && ( + + )} +
+ ) + })} +
+ )} +
+
+
+ ) +} diff --git a/datamorph-ui/src/components/MainPage.tsx b/datamorph-ui/src/components/MainPage.tsx index ef4e475..ccbf6e3 100644 --- a/datamorph-ui/src/components/MainPage.tsx +++ b/datamorph-ui/src/components/MainPage.tsx @@ -5,6 +5,7 @@ import { type DragEvent, type KeyboardEvent, } from "react" +import { useNavigate } from "react-router-dom" import type { Status } from "../models/Status" import { ImageUploader } from "../helpers/Api" import { useAuth } from "../context/AuthContext" @@ -17,6 +18,7 @@ export function MainPage() { const [isDragging, setIsDragging] = useState(false) const [isAuthenticating, setIsAuthenticating] = useState(false) const fileInputRef = useRef(null) + const navigate = useNavigate() const { isAuthenticated, user, requestGoogleSignIn, signOut } = useAuth() const fileSummary = useMemo(() => { @@ -153,7 +155,13 @@ export function MainPage() { {isAuthenticated ? `Your signed in as ${user?.name ?? ""}` : ""}
- {isAuthenticated ? ( diff --git a/datamorph-ui/src/helpers/Api.ts b/datamorph-ui/src/helpers/Api.ts index f8a2d41..3306f5f 100644 --- a/datamorph-ui/src/helpers/Api.ts +++ b/datamorph-ui/src/helpers/Api.ts @@ -22,12 +22,33 @@ export async function ImageUploader (file: File, prompt: string): Promise<[statu } await SendFileToSignedUri(file, (await response.json()).signedUploadUrl); - return ['success', 'Done! Your data request is on its way.']; + return ['success', 'Done! Your data request is on its way. You can view the status of your job in the Jobs Dashboard.']; } catch { return ['error', 'Something went wrong. Please try again.']; } } +export async function GetJobByUserId(userId: string): Promise { + + try { + var apiUrl = import.meta.env.VITE_API_URL; + + const response = await fetch(`${apiUrl}/jobs?userId=${userId}`, { + method: 'GET', + headers: { "Content-Type": "application/json" }, + }) + + if (!response.ok) { + throw new Error('Request failed') + } + + const jobs = await response.json(); + return jobs; + } catch { + return []; + } +} + async function SendFileToSignedUri(file: File, signedUrl: string): Promise { try { const response = await fetch(signedUrl, { diff --git a/datamorph-ui/src/helpers/JobHelpers.ts b/datamorph-ui/src/helpers/JobHelpers.ts new file mode 100644 index 0000000..f8ddabe --- /dev/null +++ b/datamorph-ui/src/helpers/JobHelpers.ts @@ -0,0 +1,14 @@ +export function formatDate(isoDate: string): string { + const parsedDate = new Date(isoDate) + if (Number.isNaN(parsedDate.getTime())) { + return "—" + } + + return parsedDate.toLocaleString(undefined, { + year: "numeric", + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} \ No newline at end of file diff --git a/datamorph-ui/src/models/JobModel.tsx b/datamorph-ui/src/models/JobModel.tsx new file mode 100644 index 0000000..7aa87e3 --- /dev/null +++ b/datamorph-ui/src/models/JobModel.tsx @@ -0,0 +1,14 @@ +export type JobResult = { + inputLines?: number + outputLines?: number + transformedJsonl?: string +} + +export type Job = { + jobId: string + userId: string + state: string + createdAt: string + updatedAt: string + result?: JobResult | null +} \ No newline at end of file From e5c6a65192b9f0b1500ff6616bb158e044e04217 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sat, 25 Oct 2025 15:19:18 +0100 Subject: [PATCH 05/23] vercel added for routing --- datamorph-ui/vercel.json | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 datamorph-ui/vercel.json diff --git a/datamorph-ui/vercel.json b/datamorph-ui/vercel.json new file mode 100644 index 0000000..0f32683 --- /dev/null +++ b/datamorph-ui/vercel.json @@ -0,0 +1,3 @@ +{ + "rewrites": [{ "source": "/(.*)", "destination": "/index.html" }] +} From 4486621ba9f6243b89231ff51f9049440d1dcf87 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sat, 25 Oct 2025 21:30:23 +0100 Subject: [PATCH 06/23] new sinks created --- Parser/Parser/Abstractions/IOutputSink.cs | 10 ++ .../Parser/Abstractions/IOutputSinkFactory.cs | 6 ++ .../Abstractions/ITransformationService.cs | 1 + Parser/Parser/Api/ApplyTransformation.cs | 16 +++- Parser/Parser/Program.cs | 3 + .../Services/OutputSinks/CsvOutputSink.cs | 93 +++++++++++++++++++ .../Services/OutputSinks/JsonOutputSink.cs | 55 +++++++++++ .../Services/OutputSinks/OutputSinkFactory.cs | 29 ++++++ .../Parser/Services/TransformationService.cs | 58 ++++++++++++ 9 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 Parser/Parser/Abstractions/IOutputSink.cs create mode 100644 Parser/Parser/Abstractions/IOutputSinkFactory.cs create mode 100644 Parser/Parser/Services/OutputSinks/CsvOutputSink.cs create mode 100644 Parser/Parser/Services/OutputSinks/JsonOutputSink.cs create mode 100644 Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs diff --git a/Parser/Parser/Abstractions/IOutputSink.cs b/Parser/Parser/Abstractions/IOutputSink.cs new file mode 100644 index 0000000..9fa73e0 --- /dev/null +++ b/Parser/Parser/Abstractions/IOutputSink.cs @@ -0,0 +1,10 @@ +namespace Parser.Abstractions; + +public interface IOutputSink: IAsyncDisposable +{ + Task WriteChunkAsync(StreamWriter writer, string transformedChunk); + int TotalRows { get; } + Stream GetFinalStream(); + string FileExtension { get; } + string ContentType { get; } +} \ No newline at end of file diff --git a/Parser/Parser/Abstractions/IOutputSinkFactory.cs b/Parser/Parser/Abstractions/IOutputSinkFactory.cs new file mode 100644 index 0000000..6dbabbf --- /dev/null +++ b/Parser/Parser/Abstractions/IOutputSinkFactory.cs @@ -0,0 +1,6 @@ +namespace Parser.Abstractions; + +public interface IOutputSinkFactory +{ + IOutputSink Create(string formatHint); +} \ No newline at end of file diff --git a/Parser/Parser/Abstractions/ITransformationService.cs b/Parser/Parser/Abstractions/ITransformationService.cs index d63d553..47b9b2e 100644 --- a/Parser/Parser/Abstractions/ITransformationService.cs +++ b/Parser/Parser/Abstractions/ITransformationService.cs @@ -4,4 +4,5 @@ public interface ITransformationService { Task CallGeminiForChunkAsync(List jsonlChunk, string userPrompt); Task WriteJsonlAsync(StreamWriter writer, string modelOutput); + Task GetValueFromGeminiAsync(string systemInstruction, string userPrompt); } \ No newline at end of file diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index 0a5e710..7d5e6cc 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -22,12 +22,14 @@ public class ApplyTransformation : ControllerBase private readonly ParsingConfig _config; private readonly ITransformationService _service; private readonly IGoogleAuth _googleAuth; + private readonly IOutputSinkFactory _sinkFactory; public ApplyTransformation( ILogger logger, StorageClient storageClient, ITransformationService service, IGoogleAuth googleAuth, + IOutputSinkFactory sinkFactory, IOptions config) { _logger = logger; @@ -35,6 +37,7 @@ public ApplyTransformation( _config = config.Value; _service = service; _googleAuth = googleAuth; + _sinkFactory = sinkFactory; var db = FirestoreDb.Create(_config.GcpProjectId); _collection = db.Collection(_config.FirestoreCollection); @@ -58,8 +61,8 @@ public async Task Apply([FromBody] EventArcStoragePayload body) var state = snap.TryGetValue("state", out var s) ? s : null; - if (state == "DONE") - return NoContent(); + //if (state == "DONE") + //return NoContent(); var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; var bucket = snap.TryGetValue("bucket", out var b) ? b : null; @@ -86,6 +89,9 @@ await jobRef.SetAsync(new Dictionary var outObj = $"jobs/{jobId}/transformed/data.jsonl"; await using var outMem = new MemoryStream(); await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); + + var analyzeTarget = await _service.GetValueFromGeminiAsync("Analyze the user prompt to determine the best output format. Allowed formats are csv and json. Simply return the value for format", prompt); + await using var sink = _sinkFactory.Create(analyzeTarget ?? "json"); int linesPerChunk = Math.Clamp(500, 50, 2000); var buffer = new List(linesPerChunk); @@ -100,7 +106,7 @@ await jobRef.SetAsync(new Dictionary if (buffer.Count >= linesPerChunk) { var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); - var written = await _service.WriteJsonlAsync(outWriter, transformed); + var written = await sink.WriteChunkAsync(outWriter, transformed); totalIn += buffer.Count; totalOut += written; buffer.Clear(); @@ -111,14 +117,14 @@ await jobRef.SetAsync(new Dictionary if (buffer.Count > 0) { var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); - var written = await _service.WriteJsonlAsync(outWriter, transformed); + var written = await sink.WriteChunkAsync(outWriter, transformed); totalIn += buffer.Count; totalOut += written; } await outWriter.FlushAsync(); outMem.Position = 0; - await _storage.UploadObjectAsync(_config.OutputBucket, outObj, "application/json", outMem); + await _storage.UploadObjectAsync(_config.OutputBucket, outObj, sink.ContentType, outMem); var credential = await _googleAuth.GetAsync(); var urlSigner = UrlSigner.FromCredential(credential); diff --git a/Parser/Parser/Program.cs b/Parser/Parser/Program.cs index 3af0d9d..7a78a22 100644 --- a/Parser/Parser/Program.cs +++ b/Parser/Parser/Program.cs @@ -65,6 +65,9 @@ builder.Services.AddSingleton(); builder.Services.AddTransient(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddTransient(); +builder.Services.AddTransient(); var app = builder.Build(); diff --git a/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs b/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs new file mode 100644 index 0000000..e0d65a1 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs @@ -0,0 +1,93 @@ +using System.Text; +using Parser.Abstractions; + +namespace Parser.Services; + +using System.Text.Json; + +public class CsvOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly StreamWriter _writer; + private readonly HashSet _allColumns = new(StringComparer.OrdinalIgnoreCase); + private bool _headerWritten; + private int _totalRows; + + public CsvOutputSink() + { + _writer = new StreamWriter(_buffer, new UTF8Encoding(false), leaveOpen: true); + } + + public string FileExtension => ".csv"; + public string ContentType => "text/csv"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var rows = new List>(); + + foreach (var raw in transformedChunk.Split('\n')) + { + var line = raw.Trim(); + if (string.IsNullOrWhiteSpace(line)) continue; + + var dict = JsonSerializer.Deserialize>(line); + if (dict is null) continue; + + rows.Add(dict); + + foreach (var key in dict.Keys) + _allColumns.Add(key); + } + + if (rows.Count == 0) + return 0; + + var cols = _allColumns.ToList(); + + if (!_headerWritten) + { + await writer.WriteLineAsync(string.Join(",", cols.Select(EscapeCsv))); + _headerWritten = true; + } + + foreach (var row in rows) + { + var values = cols.Select(c => + { + row.TryGetValue(c, out var val); + return EscapeCsv(val?.ToString() ?? ""); + }); + + await writer.WriteLineAsync(string.Join(",", values)); + _totalRows++; + } + + return rows.Count; + } + + + private static string EscapeCsv(string input) + { + if (input.Contains('"') || input.Contains(',') || input.Contains('\n') || input.Contains('\r')) + { + var escaped = input.Replace("\"", "\"\""); + return $"\"{escaped}\""; + } + return input; + } + + public Stream GetFinalStream() + { + _writer.Flush(); + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _writer.Dispose(); + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs b/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs new file mode 100644 index 0000000..e04d128 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs @@ -0,0 +1,55 @@ +using System.Text; +using Parser.Abstractions; + +namespace Parser.Services; + +public class JsonOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly StreamWriter _writer; + private int _totalRows; + + public JsonOutputSink() + { + _writer = new StreamWriter(_buffer, new UTF8Encoding(false), leaveOpen: true); + } + + public string FileExtension => ".jsonl"; + public string ContentType => "application/json"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var clean = transformedChunk + .Replace("```json", string.Empty) + .Replace("```", string.Empty) + .Trim(); + + var lines = clean.Split('\n') + .Select(l => l.Trim()) + .Where(l => !string.IsNullOrWhiteSpace(l)); + + int count = 0; + foreach (var l in lines) + { + if (!(l.StartsWith("{") && l.EndsWith("}"))) continue; + await writer.WriteLineAsync(l.AsMemory()); + count++; + } + return count; + } + + public Stream GetFinalStream() + { + _writer.Flush(); + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _writer.Dispose(); + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs new file mode 100644 index 0000000..9826c79 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs @@ -0,0 +1,29 @@ +using Parser.Abstractions; + +namespace Parser.Services; + +public class OutputSinkFactory: IOutputSinkFactory +{ + public IOutputSink Create(string formatHint) + { + if (string.IsNullOrEmpty(formatHint)) + { + return new JsonOutputSink(); + } + + if (formatHint.Contains("json", StringComparison.OrdinalIgnoreCase)) + { + return new JsonOutputSink(); + } + + if (formatHint.Contains("csv", StringComparison.OrdinalIgnoreCase)) + { + return new CsvOutputSink(); + } + + // TODO: Add for Parquet + + return new JsonOutputSink(); + } + +} \ No newline at end of file diff --git a/Parser/Parser/Services/TransformationService.cs b/Parser/Parser/Services/TransformationService.cs index 3a8359d..cf85b55 100644 --- a/Parser/Parser/Services/TransformationService.cs +++ b/Parser/Parser/Services/TransformationService.cs @@ -114,4 +114,62 @@ public async Task WriteJsonlAsync(StreamWriter writer, string modelOutput) } return count; } + + // Duplicate code, need to refactor later + public async Task GetValueFromGeminiAsync(string systemInstruction, string userPrompt) + { + var accessToken = await GetAccessTokenAsync(); + using var http = new HttpClient + { BaseAddress = new Uri($"https://{_config.GeminiLocation}-aiplatform.googleapis.com/") }; + http.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken); + + + var body = new + { + systemInstruction = new + { + role = "system", + parts = new[] { new { text = systemInstruction } } + }, + contents = new[] + { + new + { + role = "user", + parts = new[] + { + new { text = $"User request:\n{userPrompt}" } + } + } + }, + generationConfig = new + { + temperature = 0.1, + maxOutputTokens = 4096, + responseMimeType = "text/plain" + } + }; + + var model = _config.GeminiModel; + var url = + $"v1/projects/{_config.GcpProjectId}/locations/global/publishers/google/models/{model}:generateContent"; + + using var req = new HttpRequestMessage(HttpMethod.Post, url) + { + Content = new StringContent(JsonSerializer.Serialize(body), Encoding.UTF8, "application/json") + }; + + var resp = await http.SendAsync(req); + var respBody = await resp.Content.ReadAsStringAsync(); + if (!resp.IsSuccessStatusCode) + throw new HttpRequestException($"Vertex {resp.StatusCode}: {respBody}"); + + using var doc = JsonDocument.Parse(respBody); + var text = doc.RootElement + .GetProperty("candidates")[0] + .GetProperty("content").GetProperty("parts")[0] + .GetProperty("text").GetString(); + + return text ?? string.Empty; + } } \ No newline at end of file From f508a15e06a3e657e0bb3789eacaa780797353f3 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sat, 25 Oct 2025 22:34:45 +0100 Subject: [PATCH 07/23] parquet added to factory --- Parser/Parser/Api/ApplyTransformation.cs | 6 +- Parser/Parser/Parser.csproj | 1 + Parser/Parser/Program.cs | 1 + .../Services/OutputSinks/OutputSinkFactory.cs | 5 +- .../Services/OutputSinks/ParquetOutputSink.cs | 108 ++++++++++++++++++ 5 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index 7d5e6cc..63e7af6 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -61,8 +61,8 @@ public async Task Apply([FromBody] EventArcStoragePayload body) var state = snap.TryGetValue("state", out var s) ? s : null; - //if (state == "DONE") - //return NoContent(); + if (state == "DONE") + return NoContent(); var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; var bucket = snap.TryGetValue("bucket", out var b) ? b : null; @@ -90,7 +90,7 @@ await jobRef.SetAsync(new Dictionary await using var outMem = new MemoryStream(); await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); - var analyzeTarget = await _service.GetValueFromGeminiAsync("Analyze the user prompt to determine the best output format. Allowed formats are csv and json. Simply return the value for format", prompt); + var analyzeTarget = await _service.GetValueFromGeminiAsync("Analyze the user prompt to determine the best output format. Allowed formats are csv, json and parquet. Simply return the value for format", prompt); await using var sink = _sinkFactory.Create(analyzeTarget ?? "json"); int linesPerChunk = Math.Clamp(500, 50, 2000); diff --git a/Parser/Parser/Parser.csproj b/Parser/Parser/Parser.csproj index 7471bab..1d641df 100644 --- a/Parser/Parser/Parser.csproj +++ b/Parser/Parser/Parser.csproj @@ -13,6 +13,7 @@ + diff --git a/Parser/Parser/Program.cs b/Parser/Parser/Program.cs index 7a78a22..c4f2770 100644 --- a/Parser/Parser/Program.cs +++ b/Parser/Parser/Program.cs @@ -68,6 +68,7 @@ builder.Services.AddSingleton(); builder.Services.AddTransient(); builder.Services.AddTransient(); +builder.Services.AddTransient(); var app = builder.Build(); diff --git a/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs index 9826c79..23a2525 100644 --- a/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs +++ b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs @@ -21,7 +21,10 @@ public IOutputSink Create(string formatHint) return new CsvOutputSink(); } - // TODO: Add for Parquet + if (formatHint.Contains("parquet", StringComparison.OrdinalIgnoreCase)) + { + return new ParquetOutputSink(); + } return new JsonOutputSink(); } diff --git a/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs b/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs new file mode 100644 index 0000000..b2445dc --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs @@ -0,0 +1,108 @@ +using System.Text; +using System.Text.Json; +using System.Linq; +using Parser.Abstractions; +using Parquet; +using Parquet.Data; +using Parquet.Schema; + +namespace Parser.Services; + +public class ParquetOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly Dictionary> _columnsData = new(StringComparer.OrdinalIgnoreCase); + private readonly HashSet _allColumns = new(StringComparer.OrdinalIgnoreCase); + private int _totalRows; + + public ParquetOutputSink() + { + } + + public string FileExtension => ".parquet"; + public string ContentType => "application/octet-stream"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var rows = new List>(); + + foreach (var raw in transformedChunk.Split('\n')) + { + var line = raw.Trim(); + if (string.IsNullOrWhiteSpace(line)) continue; + + var dict = JsonSerializer.Deserialize>(line); + if (dict is null) continue; + + rows.Add(dict); + + foreach (var key in dict.Keys) + _allColumns.Add(key); + } + + if (rows.Count == 0) + return 0; + + foreach (var col in _allColumns) + { + if (!_columnsData.ContainsKey(col)) + _columnsData[col] = new List(); + } + + foreach (var row in rows) + { + foreach (var col in _allColumns) + { + row.TryGetValue(col, out var val); + _columnsData[col].Add(val); + } + + _totalRows++; + } + + return rows.Count; + } + + public Stream GetFinalStream() + { + _buffer.SetLength(0); + + // stable column order + var orderedCols = _allColumns.OrderBy(c => c, StringComparer.OrdinalIgnoreCase).ToList(); + + var dataFields = orderedCols + .Select(c => (DataField)new DataField(c)) + .ToArray(); + + var schema = new ParquetSchema(dataFields); + + using (ParquetWriter parquetWriter = ParquetWriter.CreateAsync(schema, _buffer).GetAwaiter().GetResult()) + { + using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup()) + { + foreach (var colName in orderedCols) + { + var field = (DataField)schema.DataFields.First(f => f.Name.Equals(colName, StringComparison.OrdinalIgnoreCase)); + + var values = _columnsData[colName] + .Select(v => v?.ToString()) + .ToArray(); + + var dataColumn = new DataColumn(field, values); + + groupWriter.WriteColumnAsync(dataColumn).GetAwaiter().GetResult(); + } + } + } + + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} From f8011a2e7ff93efb9db3727edb5fe485420bdddd Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sun, 26 Oct 2025 12:43:53 +0000 Subject: [PATCH 08/23] rate limiter added --- Api/Abstractions/IRateLimiter.cs | 8 +++ Api/Api/Pipelines/Initialize.cs | 7 ++- Api/Api/Pipelines/InitializeBase.cs | 5 ++ Api/DependencyInjection/AddApiService.cs | 1 + Api/Models/ApiConfig.cs | 1 + Api/Services/RateLimiter.cs | 72 ++++++++++++++++++++++++ 6 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 Api/Abstractions/IRateLimiter.cs create mode 100644 Api/Services/RateLimiter.cs diff --git a/Api/Abstractions/IRateLimiter.cs b/Api/Abstractions/IRateLimiter.cs new file mode 100644 index 0000000..55d46a1 --- /dev/null +++ b/Api/Abstractions/IRateLimiter.cs @@ -0,0 +1,8 @@ +namespace Api.Abstractions; + +public interface IRateLimiter +{ + Task GetUserLimitsAsync(string userId); + Task CheckUserLimit(string userId); + Task UpdateOrCreateUserLimitAsync(string userId); +} \ No newline at end of file diff --git a/Api/Api/Pipelines/Initialize.cs b/Api/Api/Pipelines/Initialize.cs index 7ff021e..686e808 100644 --- a/Api/Api/Pipelines/Initialize.cs +++ b/Api/Api/Pipelines/Initialize.cs @@ -14,21 +14,24 @@ public class Initialize: private readonly IFirestoreService _firestoreService; private readonly IGcsSigner _gcsSigner; private readonly ApiConfig _apiConfig; + private readonly IRateLimiter _rateLimiter; public Initialize( IFirestoreService firestoreService, IGcsSigner gcsSigner, - IOptions apiConfig + IOptions apiConfig, + IRateLimiter rateLimiter ) { _firestoreService = firestoreService; _gcsSigner = gcsSigner; _apiConfig = apiConfig.Value; + _rateLimiter = rateLimiter; } [HttpPost("init")] public virtual async Task Init([FromBody] InitRequest request) - => await Initialize(request, _firestoreService, _gcsSigner, _apiConfig); + => await Initialize(request, _firestoreService, _gcsSigner, _rateLimiter, _apiConfig); [HttpGet("init")] public virtual IActionResult GetNotAllowed() diff --git a/Api/Api/Pipelines/InitializeBase.cs b/Api/Api/Pipelines/InitializeBase.cs index 5dafaa5..ba18732 100644 --- a/Api/Api/Pipelines/InitializeBase.cs +++ b/Api/Api/Pipelines/InitializeBase.cs @@ -10,8 +10,13 @@ protected async Task Initialize( [FromBody] InitRequest request, IFirestoreService firestoreService, IGcsSigner gcsSigner, + IRateLimiter rateLimiter, ApiConfig apiConfig) { + var rateLimitResult = await rateLimiter.CheckUserLimit(request.UserId); + if (rateLimitResult) + return StatusCode(StatusCodes.Status429TooManyRequests, new {error = "To prevent abuse, we have rate limited your requests. Please try again later."}); + if (string.IsNullOrWhiteSpace(request.FileName)) return BadRequest(new {error = "File Name is required"}); diff --git a/Api/DependencyInjection/AddApiService.cs b/Api/DependencyInjection/AddApiService.cs index 851edf0..fd69fae 100644 --- a/Api/DependencyInjection/AddApiService.cs +++ b/Api/DependencyInjection/AddApiService.cs @@ -16,6 +16,7 @@ public static IServiceCollection AddApiServices( this IServiceCollection service services.AddSingleton, Initialize>(); services.AddSingleton, JobStatus>(); services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/Api/Models/ApiConfig.cs b/Api/Models/ApiConfig.cs index 1043aa5..5447018 100644 --- a/Api/Models/ApiConfig.cs +++ b/Api/Models/ApiConfig.cs @@ -5,5 +5,6 @@ public class ApiConfig public string ProjectId { get; set; } public string BucketName { get; set; } public string FirestoreCollection { get; set; } + public string FirestoreRateCollection { get; set; } public string ServiceAccountKeyPath { get; set; } } \ No newline at end of file diff --git a/Api/Services/RateLimiter.cs b/Api/Services/RateLimiter.cs new file mode 100644 index 0000000..99d2bd7 --- /dev/null +++ b/Api/Services/RateLimiter.cs @@ -0,0 +1,72 @@ +using Api.Abstractions; +using Api.Models; +using Google.Cloud.Firestore; +using Microsoft.Extensions.Options; + +namespace Api.Services; + +public class RateLimiter: IRateLimiter +{ + private readonly CollectionReference _collection; + private readonly ApiConfig _apiConfig; + + public RateLimiter(IOptions apiConfig) + { + _apiConfig = apiConfig.Value; + var db = FirestoreDb.Create(_apiConfig.ProjectId); + _collection = db.Collection(_apiConfig.FirestoreRateCollection); + } + + public async Task CheckUserLimit(string userId) + { + var userLimits = await GetUserLimitsAsync(userId); + if (userLimits) + { + return true; + } + + return false; + } + + public async Task GetUserLimitsAsync(string userId) + { + var snap = await _collection.Document(userId).GetSnapshotAsync(); + if (!snap.Exists) + { + await UpdateOrCreateUserLimitAsync(userId); + return true; + } + + + if (snap.TryGetValue("lastUpdate", out Timestamp lastUpdateTimestamp)) + { + var lastUpdated = lastUpdateTimestamp.ToDateTime().ToUniversalTime(); + var now = DateTime.UtcNow; + + if (now - lastUpdated > TimeSpan.FromMinutes(15)) + { + await UpdateOrCreateUserLimitAsync(userId); + return false; + } + + return true; + } + + await UpdateOrCreateUserLimitAsync(userId); + return true; + } + + public async Task UpdateOrCreateUserLimitAsync(string userId) + { + var doc = _collection.Document(userId); + var now = DateTime.UtcNow; + var data = new Dictionary + { + ["userId"] = userId, + ["lastUpdate"] = now + }; + + await doc.SetAsync(data); + } + +} \ No newline at end of file From cc0e69cf183938c188d0827bd8da16835026c1eb Mon Sep 17 00:00:00 2001 From: ameysunu Date: Sun, 26 Oct 2025 12:44:17 +0000 Subject: [PATCH 09/23] display error messages on ui --- datamorph-ui/src/helpers/Api.ts | 67 +++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/datamorph-ui/src/helpers/Api.ts b/datamorph-ui/src/helpers/Api.ts index 3306f5f..596dc5c 100644 --- a/datamorph-ui/src/helpers/Api.ts +++ b/datamorph-ui/src/helpers/Api.ts @@ -18,13 +18,15 @@ export async function ImageUploader (file: File, prompt: string): Promise<[statu }) if (!response.ok) { - throw new Error('Request failed') + const errorMessage = await extractResponseError(response, 'Request failed'); + throw new Error(errorMessage); } - await SendFileToSignedUri(file, (await response.json()).signedUploadUrl); + const { signedUploadUrl } = await response.json() as { signedUploadUrl: string }; + await SendFileToSignedUri(file, signedUploadUrl); - return ['success', 'Done! Your data request is on its way. You can view the status of your job in the Jobs Dashboard.']; - } catch { - return ['error', 'Something went wrong. Please try again.']; + return ['success', 'Done! Your data request is on its way. You can view the status of your job in the Jobs Dashboard.']; + } catch (error) { + return ['error', getErrorMessage(error)]; } } @@ -60,7 +62,8 @@ async function SendFileToSignedUri(file: File, signedUrl: string): Promise }) if (!response.ok) { - throw new Error('Upload failed') + const errorMessage = await extractResponseError(response, 'Upload failed'); + throw new Error(errorMessage); } } catch (error) { throw error @@ -69,4 +72,54 @@ async function SendFileToSignedUri(file: File, signedUrl: string): Promise function GetUserId() : string | null { return localStorage.getItem('userId'); -} \ No newline at end of file +} + +async function extractResponseError(response: Response, defaultMessage: string): Promise { + const fallback = defaultMessage || `Request failed with status ${response.status}`; + + try { + const rawBody = await response.text(); + + if (!rawBody) { + return `${fallback} (${response.status})`; + } + + try { + const parsed = JSON.parse(rawBody); + + if (typeof parsed === 'string' && parsed.trim().length > 0) { + return parsed; + } + + if (parsed && typeof parsed === 'object') { + const candidateKeys = ['message', 'error', 'detail', 'description']; + for (const key of candidateKeys) { + const value = (parsed as Record)[key]; + if (typeof value === 'string' && value.trim().length > 0) { + return value; + } + } + } + } catch { + if (rawBody.trim().length > 0) { + return rawBody; + } + } + + return fallback; + } catch { + return fallback; + } +} + +function getErrorMessage(error: unknown): string { + if (error instanceof Error && error.message) { + return error.message; + } + + if (typeof error === 'string' && error.trim().length > 0) { + return error; + } + + return 'An unexpected error occurred. Please try again.'; +} From 095f19b771d7c0a8dae255889966e1265a448287 Mon Sep 17 00:00:00 2001 From: ameysunu Date: Mon, 27 Oct 2025 11:56:32 +0000 Subject: [PATCH 10/23] fallback to firebase uid added on ui --- datamorph-ui/src/helpers/Api.ts | 35 ++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/datamorph-ui/src/helpers/Api.ts b/datamorph-ui/src/helpers/Api.ts index 596dc5c..3cfd734 100644 --- a/datamorph-ui/src/helpers/Api.ts +++ b/datamorph-ui/src/helpers/Api.ts @@ -71,9 +71,42 @@ async function SendFileToSignedUri(file: File, signedUrl: string): Promise } function GetUserId() : string | null { - return localStorage.getItem('userId'); + var uid = localStorage.getItem('userId'); + + if(!uid) { + uid = getFirebaseUidFromLocalStorage(); + } + + return uid; +} + +function getFirebaseUidFromLocalStorage(): string | null { + const PREFIX = 'firebase:authUser:'; + const SUFFIX = ':[DEFAULT]'; + + for (let i = 0; i < localStorage.length; i++) { + const key = localStorage.key(i); + if (!key) continue; + + if (key.startsWith(PREFIX) && key.endsWith(SUFFIX)) { + const raw = localStorage.getItem(key); + if (!raw) continue; + + try { + const parsed = JSON.parse(raw); + if (parsed && parsed.uid) { + return parsed.uid; + } + } catch (err) { + console.warn('Failed to parse firebase auth user from localStorage:', err); + } + } + } + + return null; } + async function extractResponseError(response: Response, defaultMessage: string): Promise { const fallback = defaultMessage || `Request failed with status ${response.status}`; From b285aff72321a79bc4ffc453e787a91dba252618 Mon Sep 17 00:00:00 2001 From: Amey Sunu Date: Thu, 30 Oct 2025 10:58:20 +0000 Subject: [PATCH 11/23] refresh button added --- datamorph-ui/src/App.scss | 20 ++++++++++++++++++-- datamorph-ui/src/components/Jobs.tsx | 19 ++++++++++++++----- datamorph-ui/src/components/MainPage.tsx | 2 +- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/datamorph-ui/src/App.scss b/datamorph-ui/src/App.scss index c931d40..757bb0e 100644 --- a/datamorph-ui/src/App.scss +++ b/datamorph-ui/src/App.scss @@ -199,14 +199,20 @@ body { margin: 0 auto; padding: 0.42rem 1.3rem; border-radius: 999px; - background: rgba(255, 255, 255, 0.12); - color: $text-muted; + background: #5b63b7; + color: #fff; text-transform: uppercase; letter-spacing: 0.12em; font-size: 0.68rem; font-weight: 600; } + .shell__tag:hover { + cursor: pointer; + background: #fff; + color: #5b63b7; + } + h1 { margin: 0; font-size: clamp(2.2rem, 4.8vw, 3rem); @@ -453,7 +459,17 @@ body { text-align: left; .shell__heading { + position: relative; text-align: center; + width: 100%; + + .shell__tag { + position: absolute; + top: 50%; + right: 0; + transform: translateY(-50%); + margin: 0; + } } } diff --git a/datamorph-ui/src/components/Jobs.tsx b/datamorph-ui/src/components/Jobs.tsx index 82e18b0..97541ff 100644 --- a/datamorph-ui/src/components/Jobs.tsx +++ b/datamorph-ui/src/components/Jobs.tsx @@ -13,11 +13,7 @@ export function Jobs() { const [isLoading, setIsLoading] = useState(true) const [error, setError] = useState(null) - useEffect(() => { - if (!isAuthenticated) { - navigate("/") - return - } + function LoadJobs(){ const userId = localStorage.getItem("userId") ?? "" if (!userId) { @@ -37,6 +33,16 @@ export function Jobs() { setError("Unable to load your jobs right now. Please try again later.") }) .finally(() => setIsLoading(false)) + } + + useEffect(() => { + if (!isAuthenticated) { + navigate("/") + return + } + + LoadJobs() + }, [isAuthenticated, navigate]) const orderedJobs = useMemo( @@ -85,6 +91,9 @@ export function Jobs() {

Job Runs

+
+ Refresh +
diff --git a/datamorph-ui/src/components/MainPage.tsx b/datamorph-ui/src/components/MainPage.tsx index ccbf6e3..1257690 100644 --- a/datamorph-ui/src/components/MainPage.tsx +++ b/datamorph-ui/src/components/MainPage.tsx @@ -152,7 +152,7 @@ export function MainPage() { isAuthenticated ? " shell__auth-status--active" : "" }`} > - {isAuthenticated ? `Your signed in as ${user?.name ?? ""}` : ""} + {isAuthenticated ? `You're signed in as ${user?.name ?? user?.email ?? ""}` : ""}
+ {isAuthenticated ? ( + ) : null} + {isAuthenticated ? (
+ + {job.state === "FAILED" && ( +
+
Error
+
{job.error}
+
+ )} + {jobResult && ( diff --git a/datamorph-ui/src/models/JobModel.tsx b/datamorph-ui/src/models/JobModel.tsx index 7aa87e3..15a2cb8 100644 --- a/datamorph-ui/src/models/JobModel.tsx +++ b/datamorph-ui/src/models/JobModel.tsx @@ -11,4 +11,5 @@ export type Job = { createdAt: string updatedAt: string result?: JobResult | null + error: string | null } \ No newline at end of file From 466b8b7470bf6372b98fac583605f86383d93d31 Mon Sep 17 00:00:00 2001 From: Amey Sunu Date: Thu, 30 Oct 2025 11:32:45 +0000 Subject: [PATCH 15/23] error message added to job state on fail --- Parser/Parser/Api/ApplyTransformation.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index 63e7af6..c283197 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -157,6 +157,7 @@ await jobRef.SetAsync(new Dictionary await jobRef.SetAsync(new Dictionary { ["state"] = "FAILED", + ["error"] = e.Message, ["updatedAt"] = DateTime.UtcNow, }, SetOptions.MergeAll); From 0204098ba6e1125b451a2f7d012a0418aeabae2c Mon Sep 17 00:00:00 2001 From: Amey Sunu Date: Thu, 30 Oct 2025 12:01:42 +0000 Subject: [PATCH 16/23] _logger added in ParsingApi Added logging to the ParsingApi class to capture errors during parsing. --- Parser/Parser/Api/ParsingApi.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Parser/Parser/Api/ParsingApi.cs b/Parser/Parser/Api/ParsingApi.cs index 49e36ea..2da8ac6 100644 --- a/Parser/Parser/Api/ParsingApi.cs +++ b/Parser/Parser/Api/ParsingApi.cs @@ -13,6 +13,7 @@ namespace Parser.Api; [Route("parse")] public class ParsingApi: ControllerBase, IParsingApi { + private readonly ILogger _logger; private readonly IParsingService _parsingService; private readonly ParsingConfig _config; private readonly StorageClient _storageClient; @@ -22,9 +23,11 @@ public ParsingApi( IParsingService parsingService, IOptions config, StorageClient storageClient, - PublisherClient publisher + PublisherClient publisher, + ILogger logger ) { + _logger = logger; _parsingService = parsingService; _config = config.Value; _storageClient = storageClient; @@ -78,6 +81,7 @@ async Task ReadText(StorageClient s, string b, string o) } catch (Exception e) { + _logger.Log(LogLevel.Error, "Error occurred while parse" + e.Message); return BadRequest(new { error = e.Message }); } @@ -107,4 +111,4 @@ await _storageClient.UploadObjectAsync(writeBucket, profilePath, "application/js await _publisher.PublishAsync(new PubsubMessage { Data = Google.Protobuf.ByteString.CopyFrom(data) }); } -} \ No newline at end of file +} From b55a6154f59219b18e24ec9771dd1b0eab77fa3a Mon Sep 17 00:00:00 2001 From: Amey Sunu Date: Thu, 30 Oct 2025 12:29:12 +0000 Subject: [PATCH 17/23] footer added --- datamorph-ui/src/App.scss | 8 ++++++++ datamorph-ui/src/components/Footer.tsx | 15 +++++++++++++++ datamorph-ui/src/components/Jobs.tsx | 2 ++ datamorph-ui/src/components/MainPage.tsx | 2 ++ 4 files changed, 27 insertions(+) create mode 100644 datamorph-ui/src/components/Footer.tsx diff --git a/datamorph-ui/src/App.scss b/datamorph-ui/src/App.scss index 757bb0e..6c43508 100644 --- a/datamorph-ui/src/App.scss +++ b/datamorph-ui/src/App.scss @@ -227,6 +227,14 @@ body { } } +.shell__footer { + width: 100%; + padding-top: 10px; + display: flex; + justify-content: center; +} + + .deck { position: relative; border-radius: 32px; diff --git a/datamorph-ui/src/components/Footer.tsx b/datamorph-ui/src/components/Footer.tsx new file mode 100644 index 0000000..ce02386 --- /dev/null +++ b/datamorph-ui/src/components/Footer.tsx @@ -0,0 +1,15 @@ +const GITHUB_ISSUES_URL = "https://github.com/ameysunu/DataMorph/issues" + +export function Footer() { + return ( + + ) +} diff --git a/datamorph-ui/src/components/Jobs.tsx b/datamorph-ui/src/components/Jobs.tsx index c33b6c0..f9a722a 100644 --- a/datamorph-ui/src/components/Jobs.tsx +++ b/datamorph-ui/src/components/Jobs.tsx @@ -4,6 +4,7 @@ import { useAuth } from "../context/AuthContext" import { GetJobByUserId } from "../helpers/Api" import type { Job } from "../models/JobModel" import { formatDate } from "../helpers/JobHelpers" +import { Footer } from "./Footer" export function Jobs() { @@ -192,6 +193,7 @@ export function Jobs() { )} +
) diff --git a/datamorph-ui/src/components/MainPage.tsx b/datamorph-ui/src/components/MainPage.tsx index 0113f8a..93cb09c 100644 --- a/datamorph-ui/src/components/MainPage.tsx +++ b/datamorph-ui/src/components/MainPage.tsx @@ -9,6 +9,7 @@ import { useNavigate } from "react-router-dom" import type { Status } from "../models/Status" import { ImageUploader } from "../helpers/Api" import { useAuth } from "../context/AuthContext" +import { Footer } from "./Footer" export function MainPage() { const [file, setFile] = useState(null) @@ -281,6 +282,7 @@ export function MainPage() {
+