Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/.env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Database
DATABASE_URL="file:./dev.db"
DATABASE_URL="postgresql://user:password@localhost:5432/flowfi?schema=public"

# Server
PORT=3001
Expand All @@ -14,4 +14,4 @@ SANDBOX_MODE_ENABLED=true

# Optional: Use a separate database for sandbox
# If not set, it will use {DATABASE_URL}_sandbox
SANDBOX_DATABASE_URL=file:./sandbox.db
SANDBOX_DATABASE_URL="postgresql://user:password@localhost:5432/flowfi_sandbox?schema=public"
3 changes: 2 additions & 1 deletion backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ generator client {
}

datasource db {
provider = "sqlite"
provider = "postgresql"
url = env("DATABASE_URL")
}

// User model - represents Stellar wallet addresses interacting with the protocol
Expand Down
6 changes: 5 additions & 1 deletion backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ app.get('/', (req: Request, res: Response) => {
app.get('/health', async (req: Request, res: Response) => {
const { getSandboxConfig } = await import('./config/sandbox.js');
const sandboxConfig = getSandboxConfig();

res.json({
status: 'healthy',
timestamp: new Date().toISOString(),
Expand All @@ -158,4 +158,8 @@ app.get('/health', async (req: Request, res: Response) => {
});
});

import { errorHandler } from './middleware/error.middleware.js';

app.use(errorHandler);

export default app;
154 changes: 103 additions & 51 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,114 @@
import type { Response } from 'express';
import { createStreamSchema } from '../validators/stream.validator.js';
import { sseService } from '../services/sse.service.js';
import type { SandboxRequest } from '../middleware/sandbox.middleware.js';
import { isSandboxRequest } from '../middleware/sandbox.middleware.js';
import type { Request, Response } from 'express';
import { prisma } from '../lib/prisma.js';
import logger from '../logger.js';

/**
* Helper to add sandbox metadata to response
* Create a new stream (stub for on-chain indexing)
*/
function addSandboxMetadata(data: any, isSandbox: boolean): any {
if (!isSandbox) {
return data;
export const createStream = async (req: Request, res: Response) => {
// This would typically involve validating the stream already exists on-chain
// or preparing metadata for the frontend to submit the transaction.
// For now, let's allow "registering" a stream if it doesn't exist.
try {
const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body;

const stream = await prisma.stream.upsert({
where: { streamId: parseInt(streamId) },
update: {
isActive: true,
lastUpdateTime: Math.floor(Date.now() / 1000)
},
create: {
streamId: parseInt(streamId),
sender,
recipient,
tokenAddress,
ratePerSecond,
depositedAmount,
withdrawnAmount: "0",
startTime: parseInt(startTime),
lastUpdateTime: parseInt(startTime)
}
});

return res.status(201).json(stream);
} catch (error) {
logger.error('Error creating/upserting stream:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};

return {
...data,
_sandbox: {
mode: true,
warning: 'This is sandbox data and does not affect production',
timestamp: new Date().toISOString(),
},
};
}

export const createStream = async (req: SandboxRequest, res: Response) => {
/**
* List streams by sender or recipient
*/
export const listStreams = async (req: Request, res: Response) => {
try {
const validatedData = createStreamSchema.parse(req.body);
const isSandbox = isSandboxRequest(req);

// Log sandbox mode
if (isSandbox) {
console.log('[SANDBOX] Indexing new stream intention:', validatedData);
} else {
console.log('Indexing new stream intention:', validatedData);
}
const { sender, recipient } = req.query;

const mockStream = {
id: '123',
status: 'pending',
...validatedData
};

// Broadcast to SSE clients (sandbox events are also broadcasted but clearly marked)
const streamData = addSandboxMetadata(mockStream, isSandbox);
sseService.broadcastToStream(mockStream.id, 'stream.created', streamData);
sseService.broadcastToUser(validatedData.sender, 'stream.created', streamData);
sseService.broadcastToUser(validatedData.recipient, 'stream.created', streamData);

return res.status(201).json(addSandboxMetadata(mockStream, isSandbox));
} catch (error: any) {
if (error.name === 'ZodError' || error.issues) {
return res.status(400).json({
message: 'Validation failed',
errors: error.errors || error.issues
});
const where: any = {};
if (sender) where.sender = sender as string;
if (recipient) where.recipient = recipient as string;

const streams = await prisma.stream.findMany({
where,
orderBy: { createdAt: 'desc' },
include: {
senderUser: true,
recipientUser: true
}
});

return res.status(200).json(streams);
} catch (error) {
logger.error('Error listing streams:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};

/**
* Get a single stream by ID
*/
export const getStream = async (req: Request, res: Response) => {
try {
const { streamId } = req.params;

const stream = await prisma.stream.findUnique({
where: { streamId: parseInt(streamId) },
include: {
senderUser: true,
recipientUser: true,
events: {
orderBy: { timestamp: 'desc' }
}
}
});

if (!stream) {
return res.status(404).json({ error: 'Stream not found' });
}

return res.status(500).json({
message: 'Internal server error'

return res.status(200).json(stream);
} catch (error) {
logger.error('Error fetching stream:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};

/**
* List events for a stream
*/
export const getStreamEvents = async (req: Request, res: Response) => {
try {
const { streamId } = req.params;

const events = await prisma.streamEvent.findMany({
where: { streamId: parseInt(streamId) },
orderBy: { timestamp: 'desc' }
});

return res.status(200).json(events);
} catch (error) {
logger.error('Error fetching stream events:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};
64 changes: 64 additions & 0 deletions backend/src/controllers/user.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Request, Response, NextFunction } from 'express';
import { prisma } from '../lib/prisma.js';
import logger from '../logger.js';
import { registerUserSchema } from '../validators/user.validator.js';

/**
* Register a new wallet public key
*/
export const registerUser = async (req: Request, res: Response, next: NextFunction) => {
try {
const validated = registerUserSchema.parse(req.body);
const { publicKey } = validated;

// Check if user already exists
let user = await prisma.user.findUnique({
where: { publicKey }
});

if (user) {
return res.status(200).json(user);
}

// Create new user
user = await prisma.user.create({
data: { publicKey }
});

logger.info(`User registered: ${publicKey}`);
return res.status(201).json(user);
} catch (error) {
next(error);
}
};

/**
* Get user by public key
*/
export const getUser = async (req: Request, res: Response, next: NextFunction) => {
try {
const { publicKey } = req.params;

const user = await prisma.user.findUnique({
where: { publicKey },
include: {
sentStreams: {
take: 10,
orderBy: { createdAt: 'desc' }
},
receivedStreams: {
take: 10,
orderBy: { createdAt: 'desc' }
}
}
});

if (!user) {
return res.status(404).json({ error: 'User not found' });
}

return res.status(200).json(user);
} catch (error) {
next(error);
}
};
10 changes: 5 additions & 5 deletions backend/src/lib/prisma.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { PrismaClient } from '../generated/prisma/index.js';
import { PrismaClient } from '@prisma/client';

const globalForPrisma = globalThis as unknown as {
prisma: PrismaClient | undefined;
};
const globalForPrisma = global as unknown as { prisma: PrismaClient };

export const prisma =
globalForPrisma.prisma ??
globalForPrisma.prisma ||
new PrismaClient({
log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'],
});

if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma;

export default prisma;
56 changes: 56 additions & 0 deletions backend/src/middleware/error.middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import type { Request, Response, NextFunction } from 'express';
import { Prisma } from '@prisma/client';
import { ZodError } from 'zod';
import logger from '../logger.js';

/**
* Global error handler middleware
*/
export const errorHandler = (
err: any,
req: Request,
res: Response,
next: NextFunction
) => {
logger.error('Unhandled error:', err);

// Handle Zod Validation Errors
if (err instanceof ZodError) {
return res.status(400).json({
error: 'Validation Error',
details: err.errors.map(e => ({
path: e.path.join('.'),
message: e.message
}))
});
}

// Handle Prisma Errors
if (err instanceof Prisma.PrismaClientKnownRequestError) {
// Unique constraint violation
if (err.code === 'P2002') {
const target = (err.meta?.target as string[])?.join(', ') || 'field';
return res.status(409).json({
error: 'Conflict Error',
message: `Record with this ${target} already exists.`
});
}

// Record not found
if (err.code === 'P2025') {
return res.status(404).json({
error: 'Not Found',
message: err.message || 'The requested record was not found.'
});
}
}

// Default Error
const statusCode = err.status || err.statusCode || 500;
const message = err.message || 'Internal Server Error';

res.status(statusCode).json({
error: statusCode === 500 ? 'Internal Server Error' : 'Error',
message: statusCode === 500 ? 'A technical error occurred. Please try again later.' : message
});
};
2 changes: 2 additions & 0 deletions backend/src/routes/v1/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Router } from 'express';
import streamRoutes from './stream.routes.js';
import eventsRoutes from './events.routes.js';
import userRoutes from './user.routes.js';

const router = Router();

// V1 API Routes
router.use('/streams', streamRoutes);
router.use('/events', eventsRoutes);
router.use('/users', userRoutes);

export default router;
Loading
Loading