diff --git a/backend/.env.example b/backend/.env.example index 280fad3..afe210b 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -1,5 +1,5 @@ # Database -DATABASE_URL="file:./dev.db" +DATABASE_URL="postgresql://user:password@localhost:5432/flowfi?schema=public" # Server PORT=3001 @@ -14,4 +14,4 @@ SANDBOX_MODE_ENABLED=true # Optional: Use a separate database for sandbox # If not set, it will use {DATABASE_URL}_sandbox -SANDBOX_DATABASE_URL=file:./sandbox.db +SANDBOX_DATABASE_URL="postgresql://user:password@localhost:5432/flowfi_sandbox?schema=public" diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index dc47752..9b6b832 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -7,7 +7,8 @@ generator client { } datasource db { - provider = "sqlite" + provider = "postgresql" + url = env("DATABASE_URL") } // User model - represents Stellar wallet addresses interacting with the protocol diff --git a/backend/src/app.ts b/backend/src/app.ts index efe04cf..ac558b0 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -141,7 +141,7 @@ app.get('/', (req: Request, res: Response) => { app.get('/health', async (req: Request, res: Response) => { const { getSandboxConfig } = await import('./config/sandbox.js'); const sandboxConfig = getSandboxConfig(); - + res.json({ status: 'healthy', timestamp: new Date().toISOString(), @@ -158,4 +158,8 @@ app.get('/health', async (req: Request, res: Response) => { }); }); +import { errorHandler } from './middleware/error.middleware.js'; + +app.use(errorHandler); + export default app; diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index 250d37b..632b646 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -1,62 +1,114 @@ -import type { Response } from 'express'; -import { createStreamSchema } from '../validators/stream.validator.js'; -import { sseService } from '../services/sse.service.js'; -import type { SandboxRequest } from '../middleware/sandbox.middleware.js'; -import { isSandboxRequest } from '../middleware/sandbox.middleware.js'; +import type { Request, Response } from 'express'; +import { prisma } from '../lib/prisma.js'; +import logger from '../logger.js'; /** - * Helper to add sandbox metadata to response + * Create a new stream (stub for on-chain indexing) */ -function addSandboxMetadata(data: any, isSandbox: boolean): any { - if (!isSandbox) { - return data; +export const createStream = async (req: Request, res: Response) => { + // This would typically involve validating the stream already exists on-chain + // or preparing metadata for the frontend to submit the transaction. + // For now, let's allow "registering" a stream if it doesn't exist. + try { + const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body; + + const stream = await prisma.stream.upsert({ + where: { streamId: parseInt(streamId) }, + update: { + isActive: true, + lastUpdateTime: Math.floor(Date.now() / 1000) + }, + create: { + streamId: parseInt(streamId), + sender, + recipient, + tokenAddress, + ratePerSecond, + depositedAmount, + withdrawnAmount: "0", + startTime: parseInt(startTime), + lastUpdateTime: parseInt(startTime) + } + }); + + return res.status(201).json(stream); + } catch (error) { + logger.error('Error creating/upserting stream:', error); + return res.status(500).json({ error: 'Internal server error' }); } +}; - return { - ...data, - _sandbox: { - mode: true, - warning: 'This is sandbox data and does not affect production', - timestamp: new Date().toISOString(), - }, - }; -} - -export const createStream = async (req: SandboxRequest, res: Response) => { +/** + * List streams by sender or recipient + */ +export const listStreams = async (req: Request, res: Response) => { try { - const validatedData = createStreamSchema.parse(req.body); - const isSandbox = isSandboxRequest(req); - - // Log sandbox mode - if (isSandbox) { - console.log('[SANDBOX] Indexing new stream intention:', validatedData); - } else { - console.log('Indexing new stream intention:', validatedData); - } + const { sender, recipient } = req.query; - const mockStream = { - id: '123', - status: 'pending', - ...validatedData - }; - - // Broadcast to SSE clients (sandbox events are also broadcasted but clearly marked) - const streamData = addSandboxMetadata(mockStream, isSandbox); - sseService.broadcastToStream(mockStream.id, 'stream.created', streamData); - sseService.broadcastToUser(validatedData.sender, 'stream.created', streamData); - sseService.broadcastToUser(validatedData.recipient, 'stream.created', streamData); - - return res.status(201).json(addSandboxMetadata(mockStream, isSandbox)); - } catch (error: any) { - if (error.name === 'ZodError' || error.issues) { - return res.status(400).json({ - message: 'Validation failed', - errors: error.errors || error.issues - }); + const where: any = {}; + if (sender) where.sender = sender as string; + if (recipient) where.recipient = recipient as string; + + const streams = await prisma.stream.findMany({ + where, + orderBy: { createdAt: 'desc' }, + include: { + senderUser: true, + recipientUser: true + } + }); + + return res.status(200).json(streams); + } catch (error) { + logger.error('Error listing streams:', error); + return res.status(500).json({ error: 'Internal server error' }); + } +}; + +/** + * Get a single stream by ID + */ +export const getStream = async (req: Request, res: Response) => { + try { + const { streamId } = req.params; + + const stream = await prisma.stream.findUnique({ + where: { streamId: parseInt(streamId) }, + include: { + senderUser: true, + recipientUser: true, + events: { + orderBy: { timestamp: 'desc' } + } + } + }); + + if (!stream) { + return res.status(404).json({ error: 'Stream not found' }); } - - return res.status(500).json({ - message: 'Internal server error' + + return res.status(200).json(stream); + } catch (error) { + logger.error('Error fetching stream:', error); + return res.status(500).json({ error: 'Internal server error' }); + } +}; + +/** + * List events for a stream + */ +export const getStreamEvents = async (req: Request, res: Response) => { + try { + const { streamId } = req.params; + + const events = await prisma.streamEvent.findMany({ + where: { streamId: parseInt(streamId) }, + orderBy: { timestamp: 'desc' } }); + + return res.status(200).json(events); + } catch (error) { + logger.error('Error fetching stream events:', error); + return res.status(500).json({ error: 'Internal server error' }); } }; diff --git a/backend/src/controllers/user.controller.ts b/backend/src/controllers/user.controller.ts new file mode 100644 index 0000000..e68c121 --- /dev/null +++ b/backend/src/controllers/user.controller.ts @@ -0,0 +1,64 @@ +import type { Request, Response, NextFunction } from 'express'; +import { prisma } from '../lib/prisma.js'; +import logger from '../logger.js'; +import { registerUserSchema } from '../validators/user.validator.js'; + +/** + * Register a new wallet public key + */ +export const registerUser = async (req: Request, res: Response, next: NextFunction) => { + try { + const validated = registerUserSchema.parse(req.body); + const { publicKey } = validated; + + // Check if user already exists + let user = await prisma.user.findUnique({ + where: { publicKey } + }); + + if (user) { + return res.status(200).json(user); + } + + // Create new user + user = await prisma.user.create({ + data: { publicKey } + }); + + logger.info(`User registered: ${publicKey}`); + return res.status(201).json(user); + } catch (error) { + next(error); + } +}; + +/** + * Get user by public key + */ +export const getUser = async (req: Request, res: Response, next: NextFunction) => { + try { + const { publicKey } = req.params; + + const user = await prisma.user.findUnique({ + where: { publicKey }, + include: { + sentStreams: { + take: 10, + orderBy: { createdAt: 'desc' } + }, + receivedStreams: { + take: 10, + orderBy: { createdAt: 'desc' } + } + } + }); + + if (!user) { + return res.status(404).json({ error: 'User not found' }); + } + + return res.status(200).json(user); + } catch (error) { + next(error); + } +}; diff --git a/backend/src/lib/prisma.ts b/backend/src/lib/prisma.ts index 30b93a9..a1a9c3b 100644 --- a/backend/src/lib/prisma.ts +++ b/backend/src/lib/prisma.ts @@ -1,13 +1,13 @@ -import { PrismaClient } from '../generated/prisma/index.js'; +import { PrismaClient } from '@prisma/client'; -const globalForPrisma = globalThis as unknown as { - prisma: PrismaClient | undefined; -}; +const globalForPrisma = global as unknown as { prisma: PrismaClient }; export const prisma = - globalForPrisma.prisma ?? + globalForPrisma.prisma || new PrismaClient({ log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'], }); if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma; + +export default prisma; diff --git a/backend/src/middleware/error.middleware.ts b/backend/src/middleware/error.middleware.ts new file mode 100644 index 0000000..5360030 --- /dev/null +++ b/backend/src/middleware/error.middleware.ts @@ -0,0 +1,56 @@ +import type { Request, Response, NextFunction } from 'express'; +import { Prisma } from '@prisma/client'; +import { ZodError } from 'zod'; +import logger from '../logger.js'; + +/** + * Global error handler middleware + */ +export const errorHandler = ( + err: any, + req: Request, + res: Response, + next: NextFunction +) => { + logger.error('Unhandled error:', err); + + // Handle Zod Validation Errors + if (err instanceof ZodError) { + return res.status(400).json({ + error: 'Validation Error', + details: err.errors.map(e => ({ + path: e.path.join('.'), + message: e.message + })) + }); + } + + // Handle Prisma Errors + if (err instanceof Prisma.PrismaClientKnownRequestError) { + // Unique constraint violation + if (err.code === 'P2002') { + const target = (err.meta?.target as string[])?.join(', ') || 'field'; + return res.status(409).json({ + error: 'Conflict Error', + message: `Record with this ${target} already exists.` + }); + } + + // Record not found + if (err.code === 'P2025') { + return res.status(404).json({ + error: 'Not Found', + message: err.message || 'The requested record was not found.' + }); + } + } + + // Default Error + const statusCode = err.status || err.statusCode || 500; + const message = err.message || 'Internal Server Error'; + + res.status(statusCode).json({ + error: statusCode === 500 ? 'Internal Server Error' : 'Error', + message: statusCode === 500 ? 'A technical error occurred. Please try again later.' : message + }); +}; diff --git a/backend/src/routes/v1/index.ts b/backend/src/routes/v1/index.ts index ecdb81e..1dade6a 100644 --- a/backend/src/routes/v1/index.ts +++ b/backend/src/routes/v1/index.ts @@ -1,11 +1,13 @@ import { Router } from 'express'; import streamRoutes from './stream.routes.js'; import eventsRoutes from './events.routes.js'; +import userRoutes from './user.routes.js'; const router = Router(); // V1 API Routes router.use('/streams', streamRoutes); router.use('/events', eventsRoutes); +router.use('/users', userRoutes); export default router; diff --git a/backend/src/routes/v1/stream.routes.ts b/backend/src/routes/v1/stream.routes.ts index fee90e7..02a564f 100644 --- a/backend/src/routes/v1/stream.routes.ts +++ b/backend/src/routes/v1/stream.routes.ts @@ -1,5 +1,5 @@ import { Router } from 'express'; -import { createStream } from '../../controllers/stream.controller.js'; +import { createStream, listStreams, getStream, getStreamEvents } from '../../controllers/stream.controller.js'; const router = Router(); @@ -82,4 +82,89 @@ const router = Router(); */ router.post('/', createStream); +/** + * @openapi + * /v1/streams: + * get: + * tags: + * - Streams + * summary: List streams + * description: Retrieve a list of payment streams, optionally filtered by sender or recipient. + * parameters: + * - in: query + * name: sender + * schema: + * type: string + * description: Filter by sender public key + * - in: query + * name: recipient + * schema: + * type: string + * description: Filter by recipient public key + * responses: + * 200: + * description: List of streams + * content: + * application/json: + * schema: + * type: array + * items: + * $ref: '#/components/schemas/Stream' + */ +router.get('/', listStreams); + +/** + * @openapi + * /v1/streams/{streamId}: + * get: + * tags: + * - Streams + * summary: Get a single stream + * description: Retrieve detailed information about a specific stream by its on-chain ID. + * parameters: + * - in: path + * name: streamId + * required: true + * schema: + * type: integer + * description: On-chain stream ID + * responses: + * 200: + * description: Stream details + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/Stream' + * 404: + * description: Stream not found + */ +router.get('/:streamId', getStream); + +/** + * @openapi + * /v1/streams/{streamId}/events: + * get: + * tags: + * - Streams + * summary: List stream events + * description: Retrieve all events associated with a specific stream. + * parameters: + * - in: path + * name: streamId + * required: true + * schema: + * type: integer + * description: On-chain stream ID + * responses: + * 200: + * description: List of stream events + * content: + * application/json: + * schema: + * type: array + * items: + * $ref: '#/components/schemas/StreamEvent' + */ +router.get('/:streamId/events', getStreamEvents); + export default router; diff --git a/backend/src/routes/v1/user.routes.ts b/backend/src/routes/v1/user.routes.ts new file mode 100644 index 0000000..405995d --- /dev/null +++ b/backend/src/routes/v1/user.routes.ts @@ -0,0 +1,69 @@ +import { Router } from 'express'; +import { registerUser, getUser } from '../../controllers/user.controller.js'; + +const router = Router(); + +/** + * @openapi + * /v1/users: + * post: + * tags: + * - Users + * summary: Register a wallet public key + * description: Registers a new Stellar wallet public key or returns the existing user if already registered. + * requestBody: + * required: true + * content: + * application/json: + * schema: + * type: object + * required: + * - publicKey + * properties: + * publicKey: + * type: string + * description: Stellar public key (G...) + * example: "GABC123XYZ456DEF789GHI012JKL345MNO678PQR901STU234VWX567YZA" + * responses: + * 201: + * description: User registered successfully + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/User' + * 200: + * description: User already exists + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/User' + * 400: + * description: Invalid request body + * + * /v1/users/{publicKey}: + * get: + * tags: + * - Users + * summary: Fetch a user by public key + * description: Returns user details along with recent sent and received streams. + * parameters: + * - in: path + * name: publicKey + * required: true + * schema: + * type: string + * description: Stellar public key + * responses: + * 200: + * description: User found + * content: + * application/json: + * schema: + * $ref: '#/components/schemas/User' + * 404: + * description: User not found + */ +router.post('/', registerUser); +router.get('/:publicKey', getUser); + +export default router; diff --git a/backend/src/validators/stream.validator.ts b/backend/src/validators/stream.validator.ts index 0e6d6cd..7a16538 100644 --- a/backend/src/validators/stream.validator.ts +++ b/backend/src/validators/stream.validator.ts @@ -1,11 +1,13 @@ import { z } from 'zod'; export const createStreamSchema = z.object({ + streamId: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/).transform(v => parseInt(v))]), sender: z.string().min(1, 'Sender address is required'), recipient: z.string().min(1, 'Recipient address is required'), tokenAddress: z.string().min(1, 'Token address is required'), - amount: z.string().regex(/^\d+$/, 'Amount must be a positive integer as string'), - duration: z.number().int().positive('Duration must be a positive integer in seconds'), + ratePerSecond: z.string().regex(/^\d+$/, 'Rate must be a positive integer as string'), + depositedAmount: z.string().regex(/^\d+$/, 'Amount must be a positive integer as string'), + startTime: z.union([z.number().int().nonnegative(), z.string().regex(/^\d+$/).transform(v => parseInt(v))]), }); export type CreateStreamInput = z.infer; diff --git a/backend/src/validators/user.validator.ts b/backend/src/validators/user.validator.ts new file mode 100644 index 0000000..ce9a6c3 --- /dev/null +++ b/backend/src/validators/user.validator.ts @@ -0,0 +1,7 @@ +import { z } from 'zod'; + +export const registerUserSchema = z.object({ + publicKey: z.string().min(50, 'Invalid Stellar public key').regex(/^G[A-Z2-7]{55}$/, 'Invalid Stellar public key format'), +}); + +export type RegisterUserInput = z.infer;