diff --git a/Api/Abstractions/IFirestoreService.cs b/Api/Abstractions/IFirestoreService.cs index 1bd7e35..593d295 100644 --- a/Api/Abstractions/IFirestoreService.cs +++ b/Api/Abstractions/IFirestoreService.cs @@ -1,5 +1,6 @@ using System.Text.Json.Nodes; using Api.Models; +using Google.Cloud.Firestore; namespace Api.Abstractions; @@ -8,4 +9,5 @@ public interface IFirestoreService Task CreateInitAsync(string jobId, string userId, string rawPath, JsonObject? targetSchema, string? prompt, JsonObject? options); Task?> GetAsync(string jobId); Task UpdateStateAsync(string jobId, JobState state, Dictionary? extra = null); + Task> GetDocumentsByField(string fieldName, string fieldValue); } \ No newline at end of file diff --git a/Api/Abstractions/IRateLimiter.cs b/Api/Abstractions/IRateLimiter.cs new file mode 100644 index 0000000..55d46a1 --- /dev/null +++ b/Api/Abstractions/IRateLimiter.cs @@ -0,0 +1,8 @@ +namespace Api.Abstractions; + +public interface IRateLimiter +{ + Task GetUserLimitsAsync(string userId); + Task CheckUserLimit(string userId); + Task UpdateOrCreateUserLimitAsync(string userId); +} \ No newline at end of file diff --git a/Api/Api/JobLists.cs b/Api/Api/JobLists.cs new file mode 100644 index 0000000..0467a27 --- /dev/null +++ b/Api/Api/JobLists.cs @@ -0,0 +1,44 @@ +using Api.Abstractions; +using Api.Services; +using Google.Cloud.Firestore; +using Microsoft.AspNetCore.Mvc; + +namespace Api.Api; + +[ApiController] +[Route("api/jobs")] +public class JobLists: ControllerBase +{ + private readonly ILogger _logger; + private readonly IFirestoreService _firestoreService; + + public JobLists( + ILogger logger, + IFirestoreService firestoreService + ) + { + _logger = logger; + _firestoreService = firestoreService; + } + + [HttpGet] + public async Task GetJobByUserId([FromQuery] string userId) + { + if (string.IsNullOrWhiteSpace(userId)) + return Ok(Array.Empty()); + + List jobList = new(); + var documents = await _firestoreService.GetDocumentsByField("userId", userId); + + if (documents is null || documents.Count == 0) + return Ok(jobList); + + foreach (var doc in documents) + { + var job = doc.ConvertTo(); + jobList.Add(job); + } + + return Ok(jobList); + } +} \ No newline at end of file diff --git a/Api/Api/Pipelines/Initialize.cs b/Api/Api/Pipelines/Initialize.cs index 7ff021e..686e808 100644 --- a/Api/Api/Pipelines/Initialize.cs +++ b/Api/Api/Pipelines/Initialize.cs @@ -14,21 +14,24 @@ public class Initialize: private readonly IFirestoreService _firestoreService; private readonly IGcsSigner _gcsSigner; private readonly ApiConfig _apiConfig; + private readonly IRateLimiter _rateLimiter; public Initialize( IFirestoreService firestoreService, IGcsSigner gcsSigner, - IOptions apiConfig + IOptions apiConfig, + IRateLimiter rateLimiter ) { _firestoreService = firestoreService; _gcsSigner = gcsSigner; _apiConfig = apiConfig.Value; + _rateLimiter = rateLimiter; } [HttpPost("init")] public virtual async Task Init([FromBody] InitRequest request) - => await Initialize(request, _firestoreService, _gcsSigner, _apiConfig); + => await Initialize(request, _firestoreService, _gcsSigner, _rateLimiter, _apiConfig); [HttpGet("init")] public virtual IActionResult GetNotAllowed() diff --git a/Api/Api/Pipelines/InitializeBase.cs b/Api/Api/Pipelines/InitializeBase.cs index 5dafaa5..ba18732 100644 --- a/Api/Api/Pipelines/InitializeBase.cs +++ b/Api/Api/Pipelines/InitializeBase.cs @@ -10,8 +10,13 @@ protected async Task Initialize( [FromBody] InitRequest request, IFirestoreService firestoreService, IGcsSigner gcsSigner, + IRateLimiter rateLimiter, ApiConfig apiConfig) { + var rateLimitResult = await rateLimiter.CheckUserLimit(request.UserId); + if (rateLimitResult) + return StatusCode(StatusCodes.Status429TooManyRequests, new {error = "To prevent abuse, we have rate limited your requests. Please try again later."}); + if (string.IsNullOrWhiteSpace(request.FileName)) return BadRequest(new {error = "File Name is required"}); diff --git a/Api/DependencyInjection/AddApiService.cs b/Api/DependencyInjection/AddApiService.cs index 851edf0..fd69fae 100644 --- a/Api/DependencyInjection/AddApiService.cs +++ b/Api/DependencyInjection/AddApiService.cs @@ -16,6 +16,7 @@ public static IServiceCollection AddApiServices( this IServiceCollection service services.AddSingleton, Initialize>(); services.AddSingleton, JobStatus>(); services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/Api/Models/ApiConfig.cs b/Api/Models/ApiConfig.cs index 1043aa5..5447018 100644 --- a/Api/Models/ApiConfig.cs +++ b/Api/Models/ApiConfig.cs @@ -5,5 +5,6 @@ public class ApiConfig public string ProjectId { get; set; } public string BucketName { get; set; } public string FirestoreCollection { get; set; } + public string FirestoreRateCollection { get; set; } public string ServiceAccountKeyPath { get; set; } } \ No newline at end of file diff --git a/Api/Models/JobsDocument.cs b/Api/Models/JobsDocument.cs new file mode 100644 index 0000000..e9987ef --- /dev/null +++ b/Api/Models/JobsDocument.cs @@ -0,0 +1,26 @@ +using Google.Cloud.Firestore; + +[FirestoreData] +public class JobsDocument +{ + [FirestoreProperty("jobId")] + public string JobId { get; set; } + + [FirestoreProperty("userId")] + public string UserId { get; set; } + + [FirestoreProperty("state")] + public string State { get; set; } + + [FirestoreProperty("createdAt")] + public DateTime CreatedAt { get; set; } + + [FirestoreProperty("updatedAt")] + public DateTime UpdatedAt { get; set; } + + [FirestoreProperty("result")] + public object Result { get; set; } + + [FirestoreProperty("error")] + public string? Error { get; set; } +} diff --git a/Api/Services/FirestoreService.cs b/Api/Services/FirestoreService.cs index bf7d327..436b0ad 100644 --- a/Api/Services/FirestoreService.cs +++ b/Api/Services/FirestoreService.cs @@ -49,6 +49,15 @@ public async Task CreateInitAsync( string jobId, return snap.Exists ? snap.ToDictionary() : null; } + public async Task> GetDocumentsByField(string fieldName, string fieldValue) + { + var querySnap = await _collection.WhereEqualTo(fieldName, fieldValue).GetSnapshotAsync(); + if(querySnap == null || querySnap.Documents.Count == 0) + return null; + return querySnap.Documents; + + } + public async Task UpdateStateAsync(string jobId, JobState state, Dictionary? extra = null) { var updates = new Dictionary diff --git a/Api/Services/RateLimiter.cs b/Api/Services/RateLimiter.cs new file mode 100644 index 0000000..e600888 --- /dev/null +++ b/Api/Services/RateLimiter.cs @@ -0,0 +1,72 @@ +using Api.Abstractions; +using Api.Models; +using Google.Cloud.Firestore; +using Microsoft.Extensions.Options; + +namespace Api.Services; + +public class RateLimiter: IRateLimiter +{ + private readonly CollectionReference _collection; + private readonly ApiConfig _apiConfig; + + public RateLimiter(IOptions apiConfig) + { + _apiConfig = apiConfig.Value; + var db = FirestoreDb.Create(_apiConfig.ProjectId); + _collection = db.Collection(_apiConfig.FirestoreRateCollection); + } + + public async Task CheckUserLimit(string userId) + { + var userLimits = await GetUserLimitsAsync(userId); + if (userLimits) + { + return true; + } + + return false; + } + + public async Task GetUserLimitsAsync(string userId) + { + var snap = await _collection.Document(userId).GetSnapshotAsync(); + if (!snap.Exists) + { + await UpdateOrCreateUserLimitAsync(userId); + return true; + } + + + if (snap.TryGetValue("lastUpdate", out Timestamp lastUpdateTimestamp)) + { + var lastUpdated = lastUpdateTimestamp.ToDateTime().ToUniversalTime(); + var now = DateTime.UtcNow; + + if (now - lastUpdated > TimeSpan.FromSeconds(60)) + { + await UpdateOrCreateUserLimitAsync(userId); + return false; + } + + return true; + } + + await UpdateOrCreateUserLimitAsync(userId); + return true; + } + + public async Task UpdateOrCreateUserLimitAsync(string userId) + { + var doc = _collection.Document(userId); + var now = DateTime.UtcNow; + var data = new Dictionary + { + ["userId"] = userId, + ["lastUpdate"] = now + }; + + await doc.SetAsync(data); + } + +} diff --git a/Parser/Parser/Abstractions/IOutputSink.cs b/Parser/Parser/Abstractions/IOutputSink.cs new file mode 100644 index 0000000..9fa73e0 --- /dev/null +++ b/Parser/Parser/Abstractions/IOutputSink.cs @@ -0,0 +1,10 @@ +namespace Parser.Abstractions; + +public interface IOutputSink: IAsyncDisposable +{ + Task WriteChunkAsync(StreamWriter writer, string transformedChunk); + int TotalRows { get; } + Stream GetFinalStream(); + string FileExtension { get; } + string ContentType { get; } +} \ No newline at end of file diff --git a/Parser/Parser/Abstractions/IOutputSinkFactory.cs b/Parser/Parser/Abstractions/IOutputSinkFactory.cs new file mode 100644 index 0000000..6dbabbf --- /dev/null +++ b/Parser/Parser/Abstractions/IOutputSinkFactory.cs @@ -0,0 +1,6 @@ +namespace Parser.Abstractions; + +public interface IOutputSinkFactory +{ + IOutputSink Create(string formatHint); +} \ No newline at end of file diff --git a/Parser/Parser/Abstractions/ITransformationService.cs b/Parser/Parser/Abstractions/ITransformationService.cs index d63d553..47b9b2e 100644 --- a/Parser/Parser/Abstractions/ITransformationService.cs +++ b/Parser/Parser/Abstractions/ITransformationService.cs @@ -4,4 +4,5 @@ public interface ITransformationService { Task CallGeminiForChunkAsync(List jsonlChunk, string userPrompt); Task WriteJsonlAsync(StreamWriter writer, string modelOutput); + Task GetValueFromGeminiAsync(string systemInstruction, string userPrompt); } \ No newline at end of file diff --git a/Parser/Parser/Api/ApplyTransformation.cs b/Parser/Parser/Api/ApplyTransformation.cs index dddefed..c283197 100644 --- a/Parser/Parser/Api/ApplyTransformation.cs +++ b/Parser/Parser/Api/ApplyTransformation.cs @@ -21,17 +21,23 @@ public class ApplyTransformation : ControllerBase private readonly CollectionReference _collection; private readonly ParsingConfig _config; private readonly ITransformationService _service; + private readonly IGoogleAuth _googleAuth; + private readonly IOutputSinkFactory _sinkFactory; public ApplyTransformation( ILogger logger, StorageClient storageClient, ITransformationService service, + IGoogleAuth googleAuth, + IOutputSinkFactory sinkFactory, IOptions config) { _logger = logger; _storage = storageClient; _config = config.Value; _service = service; + _googleAuth = googleAuth; + _sinkFactory = sinkFactory; var db = FirestoreDb.Create(_config.GcpProjectId); _collection = db.Collection(_config.FirestoreCollection); @@ -49,84 +55,114 @@ public async Task Apply([FromBody] EventArcStoragePayload body) var snap = await jobRef.GetSnapshotAsync(); if (!snap.Exists) return NotFound($"Job {jobId} not found."); - - var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; - var bucket = snap.TryGetValue("bucket", out var b) ? b : null; - var prompt = snap.TryGetValue("prompt", out var pr) ? pr : null; - - if (string.IsNullOrWhiteSpace(parsedPath)) - return BadRequest("Job missing parsedPath."); - if (string.IsNullOrWhiteSpace(bucket)) - bucket = _config.OutputBucket; - - await jobRef.SetAsync(new Dictionary { - ["state"] = "APPLYING GEMINI TRANSFORMATION", - ["updatedAt"] = DateTime.UtcNow, - ["transformPrompt"] = prompt, - }, SetOptions.MergeAll); - - await using var inStream = new MemoryStream(); - await _storage.DownloadObjectAsync(bucket, parsedPath, inStream); - inStream.Position = 0; - using var reader = new StreamReader(inStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: false); - - var outObj = $"jobs/{jobId}/transformed/data.jsonl"; - await using var outMem = new MemoryStream(); - await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); - - int linesPerChunk = Math.Clamp(500, 50, 2000); - var buffer = new List(linesPerChunk); - var totalIn = 0; - var totalOut = 0; - - string? line; - while ((line = await reader.ReadLineAsync()) is not null) + + try { - if (string.IsNullOrWhiteSpace(line)) continue; - buffer.Add(line); - if (buffer.Count >= linesPerChunk) + + var state = snap.TryGetValue("state", out var s) ? s : null; + + if (state == "DONE") + return NoContent(); + + var parsedPath = snap.TryGetValue("parsedPath", out var p) ? p : null; + var bucket = snap.TryGetValue("bucket", out var b) ? b : null; + var prompt = snap.TryGetValue("prompt", out var pr) ? pr : null; + + if (string.IsNullOrWhiteSpace(parsedPath)) + return BadRequest("Job missing parsedPath."); + if (string.IsNullOrWhiteSpace(bucket)) + bucket = _config.OutputBucket; + + await jobRef.SetAsync(new Dictionary { - var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); - var written = await _service.WriteJsonlAsync(outWriter, transformed); - totalIn += buffer.Count; totalOut += written; - buffer.Clear(); + ["state"] = "APPLYING GEMINI TRANSFORMATION", + ["updatedAt"] = DateTime.UtcNow, + ["transformPrompt"] = prompt, + }, SetOptions.MergeAll); + + await using var inStream = new MemoryStream(); + await _storage.DownloadObjectAsync(bucket, parsedPath, inStream); + inStream.Position = 0; + using var reader = new StreamReader(inStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: true, + bufferSize: 1024, leaveOpen: false); + + var outObj = $"jobs/{jobId}/transformed/data.jsonl"; + await using var outMem = new MemoryStream(); + await using var outWriter = new StreamWriter(outMem, new UTF8Encoding(false)); + + var analyzeTarget = await _service.GetValueFromGeminiAsync("Analyze the user prompt to determine the best output format. Allowed formats are csv, json and parquet. Simply return the value for format", prompt); + await using var sink = _sinkFactory.Create(analyzeTarget ?? "json"); + + int linesPerChunk = Math.Clamp(500, 50, 2000); + var buffer = new List(linesPerChunk); + var totalIn = 0; + var totalOut = 0; + + string? line; + while ((line = await reader.ReadLineAsync()) is not null) + { + if (string.IsNullOrWhiteSpace(line)) continue; + buffer.Add(line); + if (buffer.Count >= linesPerChunk) + { + var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); + var written = await sink.WriteChunkAsync(outWriter, transformed); + totalIn += buffer.Count; + totalOut += written; + buffer.Clear(); + } } - } - if (buffer.Count > 0) - { - var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); - var written = await _service.WriteJsonlAsync(outWriter, transformed); - totalIn += buffer.Count; totalOut += written; - } - - await outWriter.FlushAsync(); - outMem.Position = 0; - await _storage.UploadObjectAsync(_config.OutputBucket, outObj, "application/json", outMem); - - var credential = await GoogleCredential.GetApplicationDefaultAsync(); - var urlSigner = UrlSigner.FromCredential(credential); - var downloadUrl = await urlSigner.SignAsync( - bucket: $"gs://{_config.OutputBucket}/{outObj}", - objectName: outObj, - duration: TimeSpan.FromDays(7), - httpMethod: HttpMethod.Get); - - await jobRef.SetAsync(new Dictionary { - ["state"] = "DONE", - ["updatedAt"] = DateTime.UtcNow, - ["result"] = new Dictionary { - ["transformedJsonl"] = downloadUrl, - ["inputLines"] = totalIn, - ["outputLines"] = totalOut + if (buffer.Count > 0) + { + var transformed = await _service.CallGeminiForChunkAsync(buffer, prompt); + var written = await sink.WriteChunkAsync(outWriter, transformed); + totalIn += buffer.Count; + totalOut += written; } - }, SetOptions.MergeAll); - _logger.LogInformation("Transform complete for {Job}. In:{In} Out:{Out} -> gs://{Bucket}/{Obj}", - jobId, totalIn, totalOut, _config.OutputBucket, outObj); + await outWriter.FlushAsync(); + outMem.Position = 0; + await _storage.UploadObjectAsync(_config.OutputBucket, outObj, sink.ContentType, outMem); - return NoContent(); + var credential = await _googleAuth.GetAsync(); + var urlSigner = UrlSigner.FromCredential(credential); + var downloadUrl = await urlSigner.SignAsync( + bucket: _config.OutputBucket, + objectName: outObj, + duration: TimeSpan.FromDays(7), + httpMethod: HttpMethod.Get); + + await jobRef.SetAsync(new Dictionary + { + ["state"] = "DONE", + ["updatedAt"] = DateTime.UtcNow, + ["result"] = new Dictionary + { + ["transformedJsonl"] = downloadUrl, + ["inputLines"] = totalIn, + ["outputLines"] = totalOut + } + }, SetOptions.MergeAll); + + _logger.LogInformation("Transform complete for {Job}. In:{In} Out:{Out} -> gs://{Bucket}/{Obj}", + jobId, totalIn, totalOut, _config.OutputBucket, outObj); + + return NoContent(); + } + catch (Exception e) + { + _logger.LogError("Exception during transformation for {Job}: {Error}", jobId, e.Message); + await jobRef.SetAsync(new Dictionary + { + ["state"] = "FAILED", + ["error"] = e.Message, + ["updatedAt"] = DateTime.UtcNow, + }, SetOptions.MergeAll); + + return BadRequest(); + } } } diff --git a/Parser/Parser/Api/ParsingApi.cs b/Parser/Parser/Api/ParsingApi.cs index 49e36ea..2da8ac6 100644 --- a/Parser/Parser/Api/ParsingApi.cs +++ b/Parser/Parser/Api/ParsingApi.cs @@ -13,6 +13,7 @@ namespace Parser.Api; [Route("parse")] public class ParsingApi: ControllerBase, IParsingApi { + private readonly ILogger _logger; private readonly IParsingService _parsingService; private readonly ParsingConfig _config; private readonly StorageClient _storageClient; @@ -22,9 +23,11 @@ public ParsingApi( IParsingService parsingService, IOptions config, StorageClient storageClient, - PublisherClient publisher + PublisherClient publisher, + ILogger logger ) { + _logger = logger; _parsingService = parsingService; _config = config.Value; _storageClient = storageClient; @@ -78,6 +81,7 @@ async Task ReadText(StorageClient s, string b, string o) } catch (Exception e) { + _logger.Log(LogLevel.Error, "Error occurred while parse" + e.Message); return BadRequest(new { error = e.Message }); } @@ -107,4 +111,4 @@ await _storageClient.UploadObjectAsync(writeBucket, profilePath, "application/js await _publisher.PublishAsync(new PubsubMessage { Data = Google.Protobuf.ByteString.CopyFrom(data) }); } -} \ No newline at end of file +} diff --git a/Parser/Parser/Parser.csproj b/Parser/Parser/Parser.csproj index 7471bab..1d641df 100644 --- a/Parser/Parser/Parser.csproj +++ b/Parser/Parser/Parser.csproj @@ -13,6 +13,7 @@ + diff --git a/Parser/Parser/Program.cs b/Parser/Parser/Program.cs index 3af0d9d..c4f2770 100644 --- a/Parser/Parser/Program.cs +++ b/Parser/Parser/Program.cs @@ -65,6 +65,10 @@ builder.Services.AddSingleton(); builder.Services.AddTransient(); builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddTransient(); +builder.Services.AddTransient(); +builder.Services.AddTransient(); var app = builder.Build(); diff --git a/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs b/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs new file mode 100644 index 0000000..e0d65a1 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/CsvOutputSink.cs @@ -0,0 +1,93 @@ +using System.Text; +using Parser.Abstractions; + +namespace Parser.Services; + +using System.Text.Json; + +public class CsvOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly StreamWriter _writer; + private readonly HashSet _allColumns = new(StringComparer.OrdinalIgnoreCase); + private bool _headerWritten; + private int _totalRows; + + public CsvOutputSink() + { + _writer = new StreamWriter(_buffer, new UTF8Encoding(false), leaveOpen: true); + } + + public string FileExtension => ".csv"; + public string ContentType => "text/csv"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var rows = new List>(); + + foreach (var raw in transformedChunk.Split('\n')) + { + var line = raw.Trim(); + if (string.IsNullOrWhiteSpace(line)) continue; + + var dict = JsonSerializer.Deserialize>(line); + if (dict is null) continue; + + rows.Add(dict); + + foreach (var key in dict.Keys) + _allColumns.Add(key); + } + + if (rows.Count == 0) + return 0; + + var cols = _allColumns.ToList(); + + if (!_headerWritten) + { + await writer.WriteLineAsync(string.Join(",", cols.Select(EscapeCsv))); + _headerWritten = true; + } + + foreach (var row in rows) + { + var values = cols.Select(c => + { + row.TryGetValue(c, out var val); + return EscapeCsv(val?.ToString() ?? ""); + }); + + await writer.WriteLineAsync(string.Join(",", values)); + _totalRows++; + } + + return rows.Count; + } + + + private static string EscapeCsv(string input) + { + if (input.Contains('"') || input.Contains(',') || input.Contains('\n') || input.Contains('\r')) + { + var escaped = input.Replace("\"", "\"\""); + return $"\"{escaped}\""; + } + return input; + } + + public Stream GetFinalStream() + { + _writer.Flush(); + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _writer.Dispose(); + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs b/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs new file mode 100644 index 0000000..e04d128 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/JsonOutputSink.cs @@ -0,0 +1,55 @@ +using System.Text; +using Parser.Abstractions; + +namespace Parser.Services; + +public class JsonOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly StreamWriter _writer; + private int _totalRows; + + public JsonOutputSink() + { + _writer = new StreamWriter(_buffer, new UTF8Encoding(false), leaveOpen: true); + } + + public string FileExtension => ".jsonl"; + public string ContentType => "application/json"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var clean = transformedChunk + .Replace("```json", string.Empty) + .Replace("```", string.Empty) + .Trim(); + + var lines = clean.Split('\n') + .Select(l => l.Trim()) + .Where(l => !string.IsNullOrWhiteSpace(l)); + + int count = 0; + foreach (var l in lines) + { + if (!(l.StartsWith("{") && l.EndsWith("}"))) continue; + await writer.WriteLineAsync(l.AsMemory()); + count++; + } + return count; + } + + public Stream GetFinalStream() + { + _writer.Flush(); + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _writer.Dispose(); + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs new file mode 100644 index 0000000..23a2525 --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/OutputSinkFactory.cs @@ -0,0 +1,32 @@ +using Parser.Abstractions; + +namespace Parser.Services; + +public class OutputSinkFactory: IOutputSinkFactory +{ + public IOutputSink Create(string formatHint) + { + if (string.IsNullOrEmpty(formatHint)) + { + return new JsonOutputSink(); + } + + if (formatHint.Contains("json", StringComparison.OrdinalIgnoreCase)) + { + return new JsonOutputSink(); + } + + if (formatHint.Contains("csv", StringComparison.OrdinalIgnoreCase)) + { + return new CsvOutputSink(); + } + + if (formatHint.Contains("parquet", StringComparison.OrdinalIgnoreCase)) + { + return new ParquetOutputSink(); + } + + return new JsonOutputSink(); + } + +} \ No newline at end of file diff --git a/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs b/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs new file mode 100644 index 0000000..b2445dc --- /dev/null +++ b/Parser/Parser/Services/OutputSinks/ParquetOutputSink.cs @@ -0,0 +1,108 @@ +using System.Text; +using System.Text.Json; +using System.Linq; +using Parser.Abstractions; +using Parquet; +using Parquet.Data; +using Parquet.Schema; + +namespace Parser.Services; + +public class ParquetOutputSink : IOutputSink +{ + private readonly MemoryStream _buffer = new(); + private readonly Dictionary> _columnsData = new(StringComparer.OrdinalIgnoreCase); + private readonly HashSet _allColumns = new(StringComparer.OrdinalIgnoreCase); + private int _totalRows; + + public ParquetOutputSink() + { + } + + public string FileExtension => ".parquet"; + public string ContentType => "application/octet-stream"; + public int TotalRows => _totalRows; + + public async Task WriteChunkAsync(StreamWriter writer, string transformedChunk) + { + var rows = new List>(); + + foreach (var raw in transformedChunk.Split('\n')) + { + var line = raw.Trim(); + if (string.IsNullOrWhiteSpace(line)) continue; + + var dict = JsonSerializer.Deserialize>(line); + if (dict is null) continue; + + rows.Add(dict); + + foreach (var key in dict.Keys) + _allColumns.Add(key); + } + + if (rows.Count == 0) + return 0; + + foreach (var col in _allColumns) + { + if (!_columnsData.ContainsKey(col)) + _columnsData[col] = new List(); + } + + foreach (var row in rows) + { + foreach (var col in _allColumns) + { + row.TryGetValue(col, out var val); + _columnsData[col].Add(val); + } + + _totalRows++; + } + + return rows.Count; + } + + public Stream GetFinalStream() + { + _buffer.SetLength(0); + + // stable column order + var orderedCols = _allColumns.OrderBy(c => c, StringComparer.OrdinalIgnoreCase).ToList(); + + var dataFields = orderedCols + .Select(c => (DataField)new DataField(c)) + .ToArray(); + + var schema = new ParquetSchema(dataFields); + + using (ParquetWriter parquetWriter = ParquetWriter.CreateAsync(schema, _buffer).GetAwaiter().GetResult()) + { + using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup()) + { + foreach (var colName in orderedCols) + { + var field = (DataField)schema.DataFields.First(f => f.Name.Equals(colName, StringComparison.OrdinalIgnoreCase)); + + var values = _columnsData[colName] + .Select(v => v?.ToString()) + .ToArray(); + + var dataColumn = new DataColumn(field, values); + + groupWriter.WriteColumnAsync(dataColumn).GetAwaiter().GetResult(); + } + } + } + + _buffer.Position = 0; + return _buffer; + } + + public ValueTask DisposeAsync() + { + _buffer.Dispose(); + return ValueTask.CompletedTask; + } +} diff --git a/Parser/Parser/Services/TransformationService.cs b/Parser/Parser/Services/TransformationService.cs index 3a8359d..cf85b55 100644 --- a/Parser/Parser/Services/TransformationService.cs +++ b/Parser/Parser/Services/TransformationService.cs @@ -114,4 +114,62 @@ public async Task WriteJsonlAsync(StreamWriter writer, string modelOutput) } return count; } + + // Duplicate code, need to refactor later + public async Task GetValueFromGeminiAsync(string systemInstruction, string userPrompt) + { + var accessToken = await GetAccessTokenAsync(); + using var http = new HttpClient + { BaseAddress = new Uri($"https://{_config.GeminiLocation}-aiplatform.googleapis.com/") }; + http.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken); + + + var body = new + { + systemInstruction = new + { + role = "system", + parts = new[] { new { text = systemInstruction } } + }, + contents = new[] + { + new + { + role = "user", + parts = new[] + { + new { text = $"User request:\n{userPrompt}" } + } + } + }, + generationConfig = new + { + temperature = 0.1, + maxOutputTokens = 4096, + responseMimeType = "text/plain" + } + }; + + var model = _config.GeminiModel; + var url = + $"v1/projects/{_config.GcpProjectId}/locations/global/publishers/google/models/{model}:generateContent"; + + using var req = new HttpRequestMessage(HttpMethod.Post, url) + { + Content = new StringContent(JsonSerializer.Serialize(body), Encoding.UTF8, "application/json") + }; + + var resp = await http.SendAsync(req); + var respBody = await resp.Content.ReadAsStringAsync(); + if (!resp.IsSuccessStatusCode) + throw new HttpRequestException($"Vertex {resp.StatusCode}: {respBody}"); + + using var doc = JsonDocument.Parse(respBody); + var text = doc.RootElement + .GetProperty("candidates")[0] + .GetProperty("content").GetProperty("parts")[0] + .GetProperty("text").GetString(); + + return text ?? string.Empty; + } } \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b290b6 --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +# DataMorph — AI‑Driven ETL on Google Cloud Run + +Turn messy CSV/JSON into clean, structured outputs with an AI‑assisted pipeline. DataMorph handles upload → profiling → canonicalization → transformation → export, with a simple API and a lightweight React UI. + +> **Stack**: .NET 9, React + Typescript, Docker, Google Cloud - Google Cloud Run, Firestore, Firebase Auth Cloud Storage, Pub/Sub, Eventarc + +--- + +## How does this work? + +The purpose of this tool is that people can upload any messy datasets - CSV, JSON and then have Gemini perform necessary ETL on it. Gemini can also answer any question you may have on the dataset +and also clean it up for you or even generate another dataset based on what you've provided as an input - Anything pretty much! Sky is the limit on what you can do with Datamorph. + +## DataMorph vs LLM Chat UI + +You're probably wondering why use DataMorph, when you can probably use any other LLM directly, by uploading a file onto it - asking for a response. And you're probably right! However, raw LLM UI falls over when you need reliability, scale, governance and integration. Here's what happens - + +- You can enforce exactly what you need and reject/flag violations. +- DataMorph emits a strict JSONL with errors per row; no prose, no markdown. +- DataMorph is capable of handling large files (Currently disabled, to avoid huge gemini bills) via GCS uploads, chunked processing and idempotency keys. LLM UIs cap out pretty quickly +- Every run ties to a hash, so that the exact same prompt could be re-ran giving you the exact same results. +- Costs - We use a preflight profiler for estimating tokens/costs and do cheap rule based transformations before reaching an LLM, avoiding unnecessary LLM calls. + +In a nutshell, we can turn something messy, clean it up, versioned transform plans, scalable chunked processing, cost control, something an ad-hoc chat with LLM cannot operationalize. + +## Behind the scenes + +1. **Init a job** → `POST /pipelines/init` with your **prompt** to get `jobId` and a **signed upload URL**. +2. **Upload file** → the site then automatically uploads CSV/JSON directly to GCS using the signed URL. +3. **Auto‑parse** → Eventarc triggers the `DataMorph-Parser` → emits **canonical JSONL** + **profile**. +4. **Transform** → `Transformer` (pulls from Pub/Sub) applies the AI transform plan → writes **file** to GCS. +5. **Done** → Firestore updates state to `DONE`, if its all successful, allowing user to download the processed file. + +> Firebase state is constantly updated so that the user can keep track of what file is being processed with what status. + +## Architecture + +```mermaid +flowchart LR +subgraph React + Typescript UI +UI[DataMorph UI] +end + +UI -->|POST /pipelines/init - targetSchema & prompt| API[(Cloud Run .NET API)] +API -->|Create Firestore job state INIT| FS[(Firestore)] +API -->|Return jobId + signedUploadUrl| UI +UI -->|PUT file to GCS via signed URL| RAW[(GCS raw-bucket)] + + +RAW -- finalize --> EA[Eventarc] +EA --> PARSER[Parser Cloud Run] +PARSER -->|canonical.jsonl and profile.json| STAGE[(GCS staging-bucket)] +PARSER -->|Publish transform.requests| PS[(Pub/Sub)] + + +PS --> XFORM[Transformer Run] +XFORM -->|data.json and data.jsonl| OUT[(GCS output-bucket)] + +PARSER -->|state PARSING| FS +XFORM -->|state TRANSFORMING| FS +XFORM -->|state DONE + download URLs| FS + + +FS --> UI +OUT --> UI +``` + +## Buckets & Topics + +* **GCS Buckets** + + * `raw-bucket-gcs` — client uploads land here (via signed URL) + * `staging-bucket-gcs` — `canonical.jsonl` + `profile.json` + * `raw-bucket-gcs-output` — final json files + +* **PubSub Topic** + * `datamorph-pubsub` - The PubSub model on GCP which uses EventArc to trigger `/parse` + +**Notes** + +* Signed URL is **time‑limited** and **scope‑limited** to the job path. There is a 7 days expiry on the download signed url, after which user will not be able to download the said file. +* Rate limiting is set to 60 seconds per user, per file upload. This is to prevent abuse of the system, and to avoid the massive server and Gemini bill that DataMorph racks up! + +## Configuration + +A sample skeleton of the configuration can be found in the `appsettings.json` file in both `Api` and `Parser` + + +## Local Development + +- To run the .NET apis locally, please install the latest version of .NET 9. You can use `Visual Studio` on Windows as IDE. On macOS, `Jetbrains Rider` would be the preffered IDE. +- To run the frontend, ensure that you have `yarn` installed. + - Run `npm install` in `datamorph-ui` directory to install `npm` modules + - Do `yarn dev` to kick off local development. + - A sample `.env` file for frontend would look as + + ``` + VITE_FIREBASE_API_KEY=API_KEY + VITE_FIREBASE_AUTH_DOMAIN=FIREBASE_AUTH_DOMAIN + VITE_FIREBASE_PROJECT_ID=FIREBASE_PROJECT_ID + VITE_FIREBASE_APP_ID=FIREBASE_APP_ID + VITE_GOOGLE_OAUTH_ID=GOOGLE_OAUTH_ID + VITE_API_URL=API_URL + ``` + + +## Contributing + +PRs are welcome! Please open an issue describing the change before any contributions. Follow the branching convention as: `feature/issue-number` for features, `bug/issue-number` for bugs, `security/issue-number` for security issues. All PRs will be opened towards `dev` branch and not directly to `master`. Any PRs to `master` will be automatically rejected. PR will be merged from `dev` to `master` periodically by me. diff --git a/datamorph-ui/src/App.scss b/datamorph-ui/src/App.scss index 082e6a5..6c43508 100644 --- a/datamorph-ui/src/App.scss +++ b/datamorph-ui/src/App.scss @@ -199,14 +199,20 @@ body { margin: 0 auto; padding: 0.42rem 1.3rem; border-radius: 999px; - background: rgba(255, 255, 255, 0.12); - color: $text-muted; + background: #5b63b7; + color: #fff; text-transform: uppercase; letter-spacing: 0.12em; font-size: 0.68rem; font-weight: 600; } + .shell__tag:hover { + cursor: pointer; + background: #fff; + color: #5b63b7; + } + h1 { margin: 0; font-size: clamp(2.2rem, 4.8vw, 3rem); @@ -221,6 +227,14 @@ body { } } +.shell__footer { + width: 100%; + padding-top: 10px; + display: flex; + justify-content: center; +} + + .deck { position: relative; border-radius: 32px; @@ -449,6 +463,227 @@ body { } } +.shell--jobs { + text-align: left; + + .shell__heading { + position: relative; + text-align: center; + width: 100%; + + .shell__tag { + position: absolute; + top: 50%; + right: 0; + transform: translateY(-50%); + margin: 0; + } + } +} + +.jobs { + display: grid; + gap: clamp(1.8rem, 3vw, 2.4rem); + text-align: left; + + &__header { + display: grid; + gap: 0.4rem; + + h2 { + margin: 0; + font-size: clamp(1.6rem, 3.6vw, 2rem); + font-weight: 600; + letter-spacing: -0.01em; + } + } + + &__subtitle { + margin: 0; + color: $text-soft; + font-size: 1rem; + } + + &__state { + padding: 1.2rem 1.6rem; + border-radius: 24px; + border: 1px solid rgba(255, 255, 255, 0.08); + background: rgba(18, 18, 18, 0.65); + box-shadow: inset 0 0 0 1px rgba(255, 255, 255, 0.05); + color: $text-muted; + text-align: center; + + &--error { + color: #ffb6b6; + border-color: rgba(255, 77, 77, 0.38); + background: rgba(48, 16, 16, 0.65); + box-shadow: inset 0 0 0 1px rgba(255, 77, 77, 0.22); + } + } + + &__grid { + display: grid; + gap: clamp(1.4rem, 2.8vw, 1.9rem); + grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); + } +} + +.job-card { + position: relative; + display: grid; + gap: 1.2rem; + padding: 1.6rem; + border-radius: 28px; + background: rgba(18, 18, 18, 0.78); + border: 1px solid rgba(255, 255, 255, 0.08); + box-shadow: + 0 24px 50px rgba(0, 0, 0, 0.45), + inset 0 0 0 1px rgba(255, 255, 255, 0.04); + backdrop-filter: blur(18px); + transition: transform 180ms ease, box-shadow 180ms ease, border-color 180ms ease; + + &:hover { + transform: translateY(-4px); + box-shadow: + 0 30px 60px rgba(0, 0, 0, 0.55), + inset 0 0 0 1px rgba(255, 255, 255, 0.08); + border-color: rgba(255, 255, 255, 0.12); + } + + &__header { + display: flex; + align-items: center; + justify-content: space-between; + gap: 0.75rem; + } + + &__title { + margin: 0; + font-size: 1.15rem; + font-weight: 600; + color: #fff; + } + + &__status { + display: inline-flex; + align-items: center; + justify-content: center; + padding: 0.35rem 0.9rem; + border-radius: 999px; + font-size: 0.78rem; + letter-spacing: 0.08em; + font-weight: 600; + text-transform: uppercase; + background: rgba(255, 255, 255, 0.16); + color: #fff; + box-shadow: inset 0 0 0 1px rgba(255, 255, 255, 0.14); + + &--done, + &--success, + &--completed { + background: rgba(88, 210, 144, 0.24); + color: #8cf5c4; + box-shadow: inset 0 0 0 1px rgba(88, 210, 144, 0.4); + } + + &--processing, + &--running, + &--in_progress { + background: rgba(255, 191, 72, 0.24); + color: #ffd992; + box-shadow: inset 0 0 0 1px rgba(255, 191, 72, 0.4); + } + + &--failed, + &--error, + &--cancelled { + background: rgba(255, 82, 82, 0.22); + color: #ff9c9c; + box-shadow: inset 0 0 0 1px rgba(255, 82, 82, 0.38); + } + } + + &__meta { + display: grid; + gap: 0.9rem; + + div { + display: grid; + gap: 0.25rem; + } + + dt { + font-size: 0.72rem; + text-transform: uppercase; + letter-spacing: 0.1em; + color: $text-soft; + } + + dd { + margin: 0; + font-size: 0.94rem; + color: rgba(255, 255, 255, 0.88); + word-break: break-all; + } + } + + &__stats { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 0.9rem; + padding: 1rem; + border-radius: 20px; + background: rgba(255, 255, 255, 0.05); + border: 1px solid rgba(255, 255, 255, 0.08); + } + + &__stat-value { + display: block; + font-size: 1.35rem; + font-weight: 600; + color: #fff; + } + + &__stat-label { + font-size: 0.72rem; + letter-spacing: 0.08em; + text-transform: uppercase; + color: $text-soft; + } + + &__footer { + display: flex; + justify-content: flex-end; + } + + &__download { + display: inline-flex; + align-items: center; + justify-content: center; + gap: 0.45rem; + padding: 0.65rem 1.2rem; + border-radius: 999px; + text-decoration: none; + font-weight: 600; + letter-spacing: 0.04em; + color: #f1f1f1; + background: linear-gradient(135deg, rgba(255, 255, 255, 0.18), rgba(255, 255, 255, 0.08)); + box-shadow: 0 14px 30px rgba(0, 0, 0, 0.45); + transition: transform 160ms ease, box-shadow 160ms ease, filter 160ms ease; + + &:hover { + transform: translateY(-2px); + filter: brightness(1.06); + box-shadow: 0 18px 38px rgba(0, 0, 0, 0.55); + } + + &:focus-visible { + outline: 2px solid rgba(255, 255, 255, 0.6); + outline-offset: 3px; + } + } +} + @media (max-width: 720px) { .shell { padding: 2.4rem 1.9rem; @@ -463,6 +698,14 @@ body { justify-content: center; } } + + .jobs { + gap: 1.6rem; + + &__grid { + grid-template-columns: 1fr; + } + } } @keyframes bob { diff --git a/datamorph-ui/src/App.tsx b/datamorph-ui/src/App.tsx index e4136a8..67d61db 100644 --- a/datamorph-ui/src/App.tsx +++ b/datamorph-ui/src/App.tsx @@ -1,11 +1,18 @@ -import './App.scss' -import { MainPage } from './components/MainPage' -import { AuthProvider } from './context/AuthContext' +import "./App.scss" +import { BrowserRouter, Routes, Route } from "react-router-dom" +import { MainPage } from "./components/MainPage" +import { Jobs } from "./components/Jobs" +import { AuthProvider } from "./context/AuthContext" function App() { return ( - + + + } /> + } /> + + ) } diff --git a/datamorph-ui/src/components/Footer.tsx b/datamorph-ui/src/components/Footer.tsx new file mode 100644 index 0000000..ce02386 --- /dev/null +++ b/datamorph-ui/src/components/Footer.tsx @@ -0,0 +1,15 @@ +const GITHUB_ISSUES_URL = "https://github.com/ameysunu/DataMorph/issues" + +export function Footer() { + return ( + + ) +} diff --git a/datamorph-ui/src/components/Jobs.tsx b/datamorph-ui/src/components/Jobs.tsx new file mode 100644 index 0000000..f9a722a --- /dev/null +++ b/datamorph-ui/src/components/Jobs.tsx @@ -0,0 +1,200 @@ +import { useEffect, useMemo, useState } from "react" +import { useNavigate } from "react-router-dom" +import { useAuth } from "../context/AuthContext" +import { GetJobByUserId } from "../helpers/Api" +import type { Job } from "../models/JobModel" +import { formatDate } from "../helpers/JobHelpers" +import { Footer } from "./Footer" + + +export function Jobs() { + const navigate = useNavigate() + const { isAuthenticated, user, signOut } = useAuth() + const [jobs, setJobs] = useState([]) + const [isLoading, setIsLoading] = useState(true) + const [error, setError] = useState(null) + + function LoadJobs(){ + + const userId = localStorage.getItem("userId") ?? "" + if (!userId) { + setError("Missing user information. Please sign in again.") + setIsLoading(false) + return + } + + setIsLoading(true) + setError(null) + + GetJobByUserId(userId) + .then((jobsResponse) => { + setJobs(Array.isArray(jobsResponse) ? jobsResponse : []) + }) + .catch(() => { + setError("Unable to load your jobs right now. Please try again later.") + }) + .finally(() => setIsLoading(false)) + } + + useEffect(() => { + if (!isAuthenticated) { + navigate("/") + return + } + + LoadJobs() + + }, [isAuthenticated, navigate]) + + const orderedJobs = useMemo( + () => + [...jobs].sort((left, right) => { + return new Date(right.updatedAt).getTime() - new Date(left.updatedAt).getTime() + }), + [jobs] + ) + + if (!isAuthenticated) { + return null + } + + return ( +
+
+
+
+ {isAuthenticated ? `You're signed in as ${user?.name ?? ""}` : ""} +
+
+ + +
+
+ +
+

Job Runs

+
+ Refresh +
+
+ +
+
+

Recent jobs

+

Newest updates appear first.

+

All download links have a 7 day expiry. Please download your file in 7 days, from creation time.

+
+ + {isLoading && ( +
+

Loading your jobs…

+
+ )} + + {!isLoading && error && ( +
+

{error}

+
+ )} + + {!isLoading && !error && orderedJobs.length === 0 && ( +
+

No jobs yet. Submit a dataset to see it appear here.

+
+ )} + + {orderedJobs.length > 0 && ( +
+ {orderedJobs.map((job) => { + const jobResult = job.result ?? undefined + const statusModifier = (job.state || "unknown") + .toLowerCase() + .replace(/[^a-z0-9]+/g, "_") + + return ( +
+
+

Job Status

+ + {job.state} + +
+ +
+
+
Job ID
+
{job.jobId}
+
+
+
Created
+
{formatDate(job.createdAt)}
+
+
+
Last Updated
+
{formatDate(job.updatedAt)}
+
+ + {job.state === "FAILED" && ( +
+
Error
+
{job.error}
+
+ )} + +
+ + {jobResult && ( +
+
+ {jobResult.inputLines ?? "—"} + Input Lines +
+
+ {jobResult.outputLines ?? "—"} + Output Lines +
+
+ )} + + {jobResult?.transformedJsonl && ( + + )} +
+ ) + })} +
+ )} +
+
+
+
+ ) +} diff --git a/datamorph-ui/src/components/MainPage.tsx b/datamorph-ui/src/components/MainPage.tsx index ef4e475..93cb09c 100644 --- a/datamorph-ui/src/components/MainPage.tsx +++ b/datamorph-ui/src/components/MainPage.tsx @@ -5,9 +5,11 @@ import { type DragEvent, type KeyboardEvent, } from "react" +import { useNavigate } from "react-router-dom" import type { Status } from "../models/Status" import { ImageUploader } from "../helpers/Api" import { useAuth } from "../context/AuthContext" +import { Footer } from "./Footer" export function MainPage() { const [file, setFile] = useState(null) @@ -17,6 +19,7 @@ export function MainPage() { const [isDragging, setIsDragging] = useState(false) const [isAuthenticating, setIsAuthenticating] = useState(false) const fileInputRef = useRef(null) + const navigate = useNavigate() const { isAuthenticated, user, requestGoogleSignIn, signOut } = useAuth() const fileSummary = useMemo(() => { @@ -150,12 +153,21 @@ export function MainPage() { isAuthenticated ? " shell__auth-status--active" : "" }`} > - {isAuthenticated ? `Your signed in as ${user?.name ?? ""}` : ""} + {isAuthenticated ? `You're signed in as ${user?.name ?? user?.email ?? ""}` : ""}
- + ) : null} + {isAuthenticated ? ( +
) diff --git a/datamorph-ui/src/context/AuthContext.tsx b/datamorph-ui/src/context/AuthContext.tsx index 65b5b51..35c054a 100644 --- a/datamorph-ui/src/context/AuthContext.tsx +++ b/datamorph-ui/src/context/AuthContext.tsx @@ -157,6 +157,7 @@ export function AuthProvider({ children }: { children: ReactNode }) { unsubscribe = onAuthStateChanged(auth, async (firebaseUser: any) => { const mapped = await mapFirebaseUser(firebaseUser) setUser(mapped) + setUserId(auth.currentUser?.uid ?? ''); }) })() diff --git a/datamorph-ui/src/helpers/Api.ts b/datamorph-ui/src/helpers/Api.ts index f8a2d41..3cfd734 100644 --- a/datamorph-ui/src/helpers/Api.ts +++ b/datamorph-ui/src/helpers/Api.ts @@ -18,13 +18,36 @@ export async function ImageUploader (file: File, prompt: string): Promise<[statu }) if (!response.ok) { - throw new Error('Request failed') + const errorMessage = await extractResponseError(response, 'Request failed'); + throw new Error(errorMessage); } - await SendFileToSignedUri(file, (await response.json()).signedUploadUrl); + const { signedUploadUrl } = await response.json() as { signedUploadUrl: string }; + await SendFileToSignedUri(file, signedUploadUrl); + + return ['success', 'Done! Your data request is on its way. You can view the status of your job in the Jobs Dashboard.']; + } catch (error) { + return ['error', getErrorMessage(error)]; + } +} + +export async function GetJobByUserId(userId: string): Promise { + + try { + var apiUrl = import.meta.env.VITE_API_URL; + + const response = await fetch(`${apiUrl}/jobs?userId=${userId}`, { + method: 'GET', + headers: { "Content-Type": "application/json" }, + }) - return ['success', 'Done! Your data request is on its way.']; + if (!response.ok) { + throw new Error('Request failed') + } + + const jobs = await response.json(); + return jobs; } catch { - return ['error', 'Something went wrong. Please try again.']; + return []; } } @@ -39,7 +62,8 @@ async function SendFileToSignedUri(file: File, signedUrl: string): Promise }) if (!response.ok) { - throw new Error('Upload failed') + const errorMessage = await extractResponseError(response, 'Upload failed'); + throw new Error(errorMessage); } } catch (error) { throw error @@ -47,5 +71,88 @@ async function SendFileToSignedUri(file: File, signedUrl: string): Promise } function GetUserId() : string | null { - return localStorage.getItem('userId'); -} \ No newline at end of file + var uid = localStorage.getItem('userId'); + + if(!uid) { + uid = getFirebaseUidFromLocalStorage(); + } + + return uid; +} + +function getFirebaseUidFromLocalStorage(): string | null { + const PREFIX = 'firebase:authUser:'; + const SUFFIX = ':[DEFAULT]'; + + for (let i = 0; i < localStorage.length; i++) { + const key = localStorage.key(i); + if (!key) continue; + + if (key.startsWith(PREFIX) && key.endsWith(SUFFIX)) { + const raw = localStorage.getItem(key); + if (!raw) continue; + + try { + const parsed = JSON.parse(raw); + if (parsed && parsed.uid) { + return parsed.uid; + } + } catch (err) { + console.warn('Failed to parse firebase auth user from localStorage:', err); + } + } + } + + return null; +} + + +async function extractResponseError(response: Response, defaultMessage: string): Promise { + const fallback = defaultMessage || `Request failed with status ${response.status}`; + + try { + const rawBody = await response.text(); + + if (!rawBody) { + return `${fallback} (${response.status})`; + } + + try { + const parsed = JSON.parse(rawBody); + + if (typeof parsed === 'string' && parsed.trim().length > 0) { + return parsed; + } + + if (parsed && typeof parsed === 'object') { + const candidateKeys = ['message', 'error', 'detail', 'description']; + for (const key of candidateKeys) { + const value = (parsed as Record)[key]; + if (typeof value === 'string' && value.trim().length > 0) { + return value; + } + } + } + } catch { + if (rawBody.trim().length > 0) { + return rawBody; + } + } + + return fallback; + } catch { + return fallback; + } +} + +function getErrorMessage(error: unknown): string { + if (error instanceof Error && error.message) { + return error.message; + } + + if (typeof error === 'string' && error.trim().length > 0) { + return error; + } + + return 'An unexpected error occurred. Please try again.'; +} diff --git a/datamorph-ui/src/helpers/JobHelpers.ts b/datamorph-ui/src/helpers/JobHelpers.ts new file mode 100644 index 0000000..f8ddabe --- /dev/null +++ b/datamorph-ui/src/helpers/JobHelpers.ts @@ -0,0 +1,14 @@ +export function formatDate(isoDate: string): string { + const parsedDate = new Date(isoDate) + if (Number.isNaN(parsedDate.getTime())) { + return "—" + } + + return parsedDate.toLocaleString(undefined, { + year: "numeric", + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }) +} \ No newline at end of file diff --git a/datamorph-ui/src/models/JobModel.tsx b/datamorph-ui/src/models/JobModel.tsx new file mode 100644 index 0000000..15a2cb8 --- /dev/null +++ b/datamorph-ui/src/models/JobModel.tsx @@ -0,0 +1,15 @@ +export type JobResult = { + inputLines?: number + outputLines?: number + transformedJsonl?: string +} + +export type Job = { + jobId: string + userId: string + state: string + createdAt: string + updatedAt: string + result?: JobResult | null + error: string | null +} \ No newline at end of file diff --git a/datamorph-ui/vercel.json b/datamorph-ui/vercel.json new file mode 100644 index 0000000..0f32683 --- /dev/null +++ b/datamorph-ui/vercel.json @@ -0,0 +1,3 @@ +{ + "rewrites": [{ "source": "/(.*)", "destination": "/index.html" }] +}