diff --git a/packages/global/common/file/image/constants.ts b/packages/global/common/file/image/constants.ts index 5e511e4b26f5..3aa07836cef7 100644 --- a/packages/global/common/file/image/constants.ts +++ b/packages/global/common/file/image/constants.ts @@ -4,3 +4,4 @@ export const FolderIcon = 'file/fill/folder'; export const FolderImgUrl = '/imgs/files/folder.svg'; export const HttpPluginImgUrl = '/imgs/app/httpPluginFill.svg'; export const HttpImgUrl = '/imgs/workflow/http.png'; +export const TempFileURL = '/api/file/temp'; diff --git a/packages/global/core/dataset/training/type.d.ts b/packages/global/core/dataset/training/type.d.ts index 183bd0c1beb3..89071fc8ab34 100644 --- a/packages/global/core/dataset/training/type.d.ts +++ b/packages/global/core/dataset/training/type.d.ts @@ -9,7 +9,6 @@ export type PushDataToTrainingQueueProps = { data: PushDatasetDataChunkProps[]; mode?: TrainingModeEnum; - data: PushDatasetDataChunkProps[]; agentModel: string; vectorModel: string; diff --git a/packages/global/core/dataset/type.d.ts b/packages/global/core/dataset/type.d.ts index d854bd274229..e3fee578158e 100644 --- a/packages/global/core/dataset/type.d.ts +++ b/packages/global/core/dataset/type.d.ts @@ -118,6 +118,7 @@ export type DatasetCollectionSchemaType = ChunkSettingsType & { rawTextLength?: number; hashRawText?: string; + metadata?: { webPageSelector?: string; relatedImgId?: string; // The id of the associated image collections @@ -250,7 +251,10 @@ export type TagUsageType = { export type DatasetCollectionItemType = CollectionWithDatasetType & { sourceName: string; sourceId?: string; - file?: DatasetFileSchema; + file?: { + filename?: string; + contentLength?: number; + }; permission: DatasetPermission; indexAmount: number; errorCount?: number; diff --git a/packages/global/core/dataset/v2/api.ts b/packages/global/core/dataset/v2/api.ts new file mode 100644 index 000000000000..fdcd3540528c --- /dev/null +++ b/packages/global/core/dataset/v2/api.ts @@ -0,0 +1,34 @@ +import { ObjectIdSchema } from '../../../common/type/mongo'; +import z from 'zod'; + +export const PresignDatasetFileGetUrlSchema = z.union([ + z.object({ + key: z + .string() + .nonempty() + .refine((key) => key.startsWith('dataset/'), { + message: 'Invalid key format: must start with "dataset/"' + }) + .transform((k) => decodeURIComponent(k)), + preview: z.boolean().optional() + }), + z.object({ + collectionId: ObjectIdSchema + // datasetId: ObjectIdSchema + }) +]); +export type PresignDatasetFileGetUrlParams = z.infer; + +export const PresignDatasetFilePostUrlSchema = z.object({ + filename: z.string().min(1), + datasetId: ObjectIdSchema +}); +export type PresignDatasetFilePostUrlParams = z.infer; + +export const ShortPreviewLinkSchema = z.object({ + k: z + .string() + .nonempty() + .transform((k) => `chat:temp_file:${decodeURIComponent(k)}`) +}); +export type ShortPreviewLinkParams = z.infer; diff --git a/packages/service/common/file/gridfs/controller.ts b/packages/service/common/file/gridfs/controller.ts index d1e707816c6c..734074d4dcaf 100644 --- a/packages/service/common/file/gridfs/controller.ts +++ b/packages/service/common/file/gridfs/controller.ts @@ -4,16 +4,12 @@ import fsp from 'fs/promises'; import fs from 'fs'; import { type DatasetFileSchema } from '@fastgpt/global/core/dataset/type'; import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema'; -import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools'; -import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; -import { readRawContentByFileBuffer } from '../read/utils'; -import { computeGridFsChunSize, gridFsStream2Buffer, stream2Encoding } from './utils'; +import { detectFileEncodingByPath } from '@fastgpt/global/common/file/tools'; +import { computeGridFsChunSize, stream2Encoding } from './utils'; import { addLog } from '../../system/log'; -import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { Readable } from 'stream'; -import { addRawTextBuffer, getRawTextBuffer } from '../../buffer/rawText/controller'; -import { addMinutes } from 'date-fns'; import { retryFn } from '@fastgpt/global/common/system/utils'; +import { getS3DatasetSource } from '../../s3/sources/dataset'; export function getGFSCollection(bucket: `${BucketNameEnum}`) { MongoDatasetFileSchema; @@ -162,11 +158,17 @@ export async function delFileByFileIdList({ fileIdList: string[]; }): Promise { return retryFn(async () => { + const s3DatasetSource = getS3DatasetSource(); + const bucket = getGridBucket(bucketName); for await (const fileId of fileIdList) { try { - await bucket.delete(new Types.ObjectId(String(fileId))); + if (s3DatasetSource.isDatasetObjectKey(fileId)) { + await s3DatasetSource.deleteDatasetFileByKey(fileId); + } else { + await bucket.delete(new Types.ObjectId(String(fileId))); + } } catch (error: any) { if (typeof error?.message === 'string' && error.message.includes('File not found')) { addLog.warn('File not found', { fileId }); @@ -189,78 +191,3 @@ export async function getDownloadStream({ return bucket.openDownloadStream(new Types.ObjectId(fileId)); } - -export const readFileContentFromMongo = async ({ - teamId, - tmbId, - bucketName, - fileId, - customPdfParse = false, - getFormatText, - usageId -}: { - teamId: string; - tmbId: string; - bucketName: `${BucketNameEnum}`; - fileId: string; - customPdfParse?: boolean; - getFormatText?: boolean; // 数据类型都尽可能转化成 markdown 格式 - usageId?: string; -}): Promise<{ - rawText: string; - filename: string; -}> => { - const bufferId = `${String(fileId)}-${customPdfParse}`; - // read buffer - const fileBuffer = await getRawTextBuffer(bufferId); - if (fileBuffer) { - return { - rawText: fileBuffer.text, - filename: fileBuffer?.sourceName - }; - } - - const [file, fileStream] = await Promise.all([ - getFileById({ bucketName, fileId }), - getDownloadStream({ bucketName, fileId }) - ]); - if (!file) { - return Promise.reject(CommonErrEnum.fileNotFound); - } - - const extension = parseFileExtensionFromUrl(file?.filename); - - const start = Date.now(); - const fileBuffers = await gridFsStream2Buffer(fileStream); - addLog.debug('get file buffer', { time: Date.now() - start }); - - const encoding = file?.metadata?.encoding || detectFileEncoding(fileBuffers); - - // Get raw text - const { rawText } = await readRawContentByFileBuffer({ - customPdfParse, - usageId, - getFormatText, - extension, - teamId, - tmbId, - buffer: fileBuffers, - encoding, - metadata: { - relatedId: fileId - } - }); - - // Add buffer - addRawTextBuffer({ - sourceId: bufferId, - sourceName: file.filename, - text: rawText, - expiredTime: addMinutes(new Date(), 20) - }); - - return { - rawText, - filename: file.filename - }; -}; diff --git a/packages/service/common/file/image/controller.ts b/packages/service/common/file/image/controller.ts index 700d41631f89..0fe6ebc9a3cb 100644 --- a/packages/service/common/file/image/controller.ts +++ b/packages/service/common/file/image/controller.ts @@ -64,23 +64,28 @@ export async function uploadMongoImg({ export const copyAvatarImage = async ({ teamId, imageUrl, - ttl, + temporary, session }: { teamId: string; imageUrl: string; - ttl: boolean; + temporary: boolean; session?: ClientSession; }) => { if (!imageUrl) return; - // S3 - if (imageUrl.startsWith(`${imageBaseUrl}/${S3Sources.avatar}`)) { - const extendName = path.extname(imageUrl); + const avatarSource = getS3AvatarSource(); + if (avatarSource.isAvatarKey(imageUrl)) { + const filename = (() => { + const last = imageUrl.split('/').pop()?.split('-')[1]; + if (!last) return getNanoid(6).concat(path.extname(imageUrl)); + return `${getNanoid(6)}-${last}`; + })(); const key = await getS3AvatarSource().copyAvatar({ - sourceKey: imageUrl.slice(imageBaseUrl.length), - targetKey: `${S3Sources.avatar}/${teamId}/${getNanoid(6)}${extendName}`, - ttl + key: imageUrl, + teamId, + filename, + temporary }); return key; } @@ -130,9 +135,13 @@ export const removeImageByPath = (path?: string, session?: ClientSession) => { if (!name) return; const id = name.split('.')[0]; - if (!id || !Types.ObjectId.isValid(id)) return; + if (!id) return; - return MongoImage.deleteOne({ _id: id }, { session }); + if (Types.ObjectId.isValid(id)) { + return MongoImage.deleteOne({ _id: id }, { session }); + } else if (getS3AvatarSource().isAvatarKey(path)) { + return getS3AvatarSource().deleteAvatar(path, session); + } }; export async function readMongoImg({ id }: { id: string }) { diff --git a/packages/service/common/file/read/utils.ts b/packages/service/common/file/read/utils.ts index 3cc8e26e8cab..1049ac20f742 100644 --- a/packages/service/common/file/read/utils.ts +++ b/packages/service/common/file/read/utils.ts @@ -1,4 +1,3 @@ -import { uploadMongoImg } from '../image/controller'; import FormData from 'form-data'; import fs from 'fs'; import type { ReadFileResponse } from '../../../worker/readFile/type'; @@ -9,6 +8,9 @@ import { matchMdImg } from '@fastgpt/global/common/string/markdown'; import { createPdfParseUsage } from '../../../support/wallet/usage/controller'; import { useDoc2xServer } from '../../../thirdProvider/doc2x'; import { readRawContentFromBuffer } from '../../../worker/function'; +import { uploadImage2S3Bucket } from '../../s3/utils'; +import { Mimes } from '../../s3/constants'; +import { addDays } from 'date-fns'; export type readRawTextByLocalFileParams = { teamId: string; @@ -17,6 +19,7 @@ export type readRawTextByLocalFileParams = { encoding: string; customPdfParse?: boolean; getFormatText?: boolean; + uploadKey: string; metadata?: Record; }; export const readRawTextByLocalFile = async (params: readRawTextByLocalFileParams) => { @@ -26,7 +29,7 @@ export const readRawTextByLocalFile = async (params: readRawTextByLocalFileParam const buffer = await fs.promises.readFile(path); - return readRawContentByFileBuffer({ + return readS3FileContentByBuffer({ extension, customPdfParse: params.customPdfParse, getFormatText: params.getFormatText, @@ -34,21 +37,24 @@ export const readRawTextByLocalFile = async (params: readRawTextByLocalFileParam tmbId: params.tmbId, encoding: params.encoding, buffer, - metadata: params.metadata + imageKeyOptions: { + prefix: params.uploadKey, + expiredTime: addDays(new Date(), 1) + } }); }; -export const readRawContentByFileBuffer = async ({ +export const readS3FileContentByBuffer = async ({ teamId, tmbId, extension, buffer, encoding, - metadata, customPdfParse = false, usageId, - getFormatText = true + getFormatText = true, + imageKeyOptions }: { teamId: string; tmbId: string; @@ -56,11 +62,14 @@ export const readRawContentByFileBuffer = async ({ extension: string; buffer: Buffer; encoding: string; - metadata?: Record; customPdfParse?: boolean; usageId?: string; getFormatText?: boolean; + imageKeyOptions: { + prefix: string; + expiredTime?: Date; + }; }): Promise<{ rawText: string; }> => { @@ -158,21 +167,24 @@ export const readRawContentByFileBuffer = async ({ addLog.debug(`Parse file success, time: ${Date.now() - start}ms. `); // markdown data format - if (imageList) { + if (imageList && imageList.length > 0) { + addLog.debug(`Processing ${imageList.length} images from parsed document`); + await batchRun(imageList, async (item) => { const src = await (async () => { try { - return await uploadMongoImg({ + const { prefix, expiredTime } = imageKeyOptions; + const ext = `.${item.mime.split('/')[1].replace('x-', '')}`; + + return await uploadImage2S3Bucket('private', { base64Img: `data:${item.mime};base64,${item.base64}`, - teamId, - metadata: { - ...metadata, - mime: item.mime - } + uploadKey: `${prefix}/${item.uuid}.${ext}`, + mimetype: Mimes[ext as keyof typeof Mimes], + filename: `${item.uuid}.${ext}`, + expiredTime }); } catch (error) { - addLog.warn('Upload file image error', { error }); - return 'Upload load image error'; + return `[Image Upload Failed: ${item.uuid}]`; } })(); rawText = rawText.replace(item.uuid, src); @@ -182,7 +194,7 @@ export const readRawContentByFileBuffer = async ({ }); } - addLog.debug(`Upload file success, time: ${Date.now() - start}ms`); - - return { rawText: getFormatText ? formatText || rawText : rawText }; + return { + rawText: getFormatText ? formatText || rawText : rawText + }; }; diff --git a/packages/service/common/s3/buckets/base.ts b/packages/service/common/s3/buckets/base.ts index a17c3b3b8dd9..9264cf59448f 100644 --- a/packages/service/common/s3/buckets/base.ts +++ b/packages/service/common/s3/buckets/base.ts @@ -1,4 +1,10 @@ -import { Client, type RemoveOptions, type CopyConditions, InvalidObjectNameError } from 'minio'; +import { + Client, + type RemoveOptions, + type CopyConditions, + InvalidObjectNameError, + S3Error +} from 'minio'; import { type CreatePostPresignedUrlOptions, type CreatePostPresignedUrlParams, @@ -11,9 +17,10 @@ import { defaultS3Options, getSystemMaxFileSize, Mimes } from '../constants'; import path from 'node:path'; import { MongoS3TTL } from '../schema'; import { getNanoid } from '@fastgpt/global/common/string/tools'; -import { addHours } from 'date-fns'; +import { addHours, addMinutes } from 'date-fns'; import { addLog } from '../../system/log'; import { addS3DelJob } from '../mq'; +import { type Readable } from 'node:stream'; export class S3BaseBucket { private _client: Client; @@ -80,24 +87,26 @@ export class S3BaseBucket { } async copy({ - src, - dst, - ttl, + from, + to, options }: { - src: string; - dst: string; - ttl: boolean; - options?: CopyConditions; + from: string; + to: string; + options?: { + temporary?: boolean; + copyConditions?: CopyConditions; + }; }): ReturnType { - if (ttl) { + const bucket = this.name; + if (options?.temporary) { await MongoS3TTL.create({ - minioKey: dst, + minioKey: to, bucketName: this.name, expiredTime: addHours(new Date(), 24) }); } - return this.client.copyObject(this.name, src, dst, options); + return this.client.copyObject(bucket, to, `${bucket}/${from}`, options?.copyConditions); } exist(): Promise { @@ -109,24 +118,46 @@ export class S3BaseBucket { if (!objectKey) return Promise.resolve(); return await this.client.removeObject(this.name, objectKey, options); } catch (error) { - if (error instanceof InvalidObjectNameError) { - addLog.warn(`${this.name} delete object not found: ${objectKey}`, error); - return Promise.resolve(); + if (error instanceof S3Error) { + if (error.code === 'InvalidObjectName') { + addLog.warn(`${this.name} delete object not found: ${objectKey}`, error); + return Promise.resolve(); + } } return Promise.reject(error); } } - addDeleteJob({ prefix, key }: { prefix?: string; key?: string }): Promise { - return addS3DelJob({ prefix, key, bucketName: this.name }); - } - listObjectsV2( ...params: Parameters extends [string, ...infer R] ? R : never ) { return this.client.listObjectsV2(this.name, ...params); } + putObject(...params: Parameters extends [string, ...infer R] ? R : never) { + return this.client.putObject(this.name, ...params); + } + + getObject(...params: Parameters extends [string, ...infer R] ? R : never) { + return this.client.getObject(this.name, ...params); + } + + statObject(...params: Parameters extends [string, ...infer R] ? R : never) { + return this.client.statObject(this.name, ...params); + } + + async fileStreamToBuffer(stream: Readable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + return Buffer.concat(chunks); + } + + addDeleteJob(params: Omit[0], 'bucketName'>) { + return addS3DelJob({ ...params, bucketName: this.name }); + } + async createPostPresignedUrl( params: CreatePostPresignedUrlParams, options: CreatePostPresignedUrlOptions = {} @@ -140,8 +171,7 @@ export class S3BaseBucket { const key = (() => { if ('rawKey' in params) return params.rawKey; - - return `${params.source}/${params.teamId}/${getNanoid(6)}-${filename}`; + return [params.source, params.teamId, `${getNanoid(6)}-${filename}`].join('/'); })(); const policy = this.externalClient.newPostPolicy(); @@ -151,11 +181,12 @@ export class S3BaseBucket { if (formatMaxFileSize) { policy.setContentLengthRange(1, formatMaxFileSize); } - policy.setExpires(new Date(Date.now() + 10 * 60 * 1000)); + policy.setExpires(addMinutes(new Date(), 10)); policy.setUserMetaData({ 'content-disposition': `attachment; filename="${encodeURIComponent(filename)}"`, 'origin-filename': encodeURIComponent(filename), - 'upload-time': new Date().toISOString() + 'upload-time': new Date().toISOString(), + ...params.metadata }); const { formData, postURL } = await this.externalClient.presignedPostPolicy(policy); diff --git a/packages/service/common/s3/mq.ts b/packages/service/common/s3/mq.ts index 9c6ac5a5ab84..1deaea7b657e 100644 --- a/packages/service/common/s3/mq.ts +++ b/packages/service/common/s3/mq.ts @@ -4,6 +4,7 @@ import { retryFn } from '@fastgpt/global/common/system/utils'; export type S3MQJobData = { key?: string; + keys?: string[]; prefix?: string; bucketName: string; }; @@ -29,9 +30,8 @@ export const startS3DelWorker = async () => { return getWorker( QueueNames.s3FileDelete, async (job) => { - const { prefix, bucketName, key } = job.data; + const { prefix, bucketName, key, keys } = job.data; const limit = pLimit(10); - const tasks: Promise[] = []; const bucket = s3BucketMap[bucketName]; if (!bucket) { return Promise.reject(`Bucket not found: ${bucketName}`); @@ -40,7 +40,16 @@ export const startS3DelWorker = async () => { if (key) { await bucket.delete(key); } + if (keys) { + const tasks: Promise[] = []; + for (const key of keys) { + const p = limit(() => retryFn(() => bucket.delete(key))); + tasks.push(p); + } + await Promise.all(tasks); + } if (prefix) { + const tasks: Promise[] = []; return new Promise(async (resolve, reject) => { const stream = bucket.listObjectsV2(prefix, true); stream.on('data', async (file) => { diff --git a/packages/service/common/s3/sources/avatar.ts b/packages/service/common/s3/sources/avatar.ts index 0fbcd738dcce..8f3bbe7a8454 100644 --- a/packages/service/common/s3/sources/avatar.ts +++ b/packages/service/common/s3/sources/avatar.ts @@ -68,20 +68,29 @@ class S3AvatarSource { } async copyAvatar({ - sourceKey, - targetKey, - ttl + key, + teamId, + filename, + temporary = false }: { - sourceKey: string; - targetKey: string; - ttl: boolean; + key: string; + teamId: string; + filename: string; + temporary: boolean; }) { - await this.bucket.copy({ - src: sourceKey, - dst: targetKey, - ttl - }); - return targetKey; + const from = key.slice(this.prefix.length); + const to = `${S3Sources.avatar}/${teamId}/${filename}`; + await this.bucket.copy({ from, to, options: { temporary } }); + return this.prefix.concat(to); + } + + isAvatarKey( + key?: string, + options?: { prefix?: string } + ): key is `${typeof S3Sources.avatar}/${string}` { + const { prefix = this.prefix } = options ?? {}; + const objectKey = prefix ? key?.slice(prefix.length) : key; + return objectKey?.startsWith(`${S3Sources.avatar}/`) ?? false; } } diff --git a/packages/service/common/s3/sources/chat/index.ts b/packages/service/common/s3/sources/chat/index.ts index 0aaad652609c..a98ca1a8e95f 100644 --- a/packages/service/common/s3/sources/chat/index.ts +++ b/packages/service/common/s3/sources/chat/index.ts @@ -1,4 +1,4 @@ -import { getNanoid } from '@fastgpt/global/common/string/tools'; +import { getNanoid, parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { S3PrivateBucket } from '../../buckets/private'; import { S3Sources } from '../../type'; import { @@ -7,8 +7,6 @@ import { ChatFileUploadSchema, DelChatFileByPrefixSchema } from './type'; -import { MongoS3TTL } from '../../schema'; -import { addHours } from 'date-fns'; class S3ChatSource { private bucket: S3PrivateBucket; @@ -22,10 +20,37 @@ class S3ChatSource { return (this.instance ??= new S3ChatSource()); } - static isChatFileKey(key?: string): key is `${typeof S3Sources.chat}/${string}` { + isChatFileKey(key?: string): key is `${typeof S3Sources.chat}/${string}` { return key?.startsWith(`${S3Sources.chat}/`) ?? false; } + // 获取文件流 + getChatFileStream(key: string) { + return this.bucket.getObject(key); + } + + // 获取文件状态 + getChatFileStat(key: string) { + return this.bucket.statObject(key); + } + + // 获取文件元数据 + async getFileMetadata(key: string) { + const stat = await this.getChatFileStat(key); + if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' }; + + const contentLength = stat.size; + const filename: string = decodeURIComponent(stat.metaData['origin-filename']); + const extension = parseFileExtensionFromUrl(filename); + const contentType: string = stat.metaData['content-type']; + return { + filename, + extension, + contentType, + contentLength + }; + } + async createGetChatFileURL(params: { key: string; expiredHours?: number; external: boolean }) { const { key, expiredHours = 1, external = false } = params; // 默认一个小时 @@ -38,12 +63,7 @@ class S3ChatSource { async createUploadChatFileURL(params: CheckChatFileKeys) { const { appId, chatId, uId, filename } = ChatFileUploadSchema.parse(params); const rawKey = [S3Sources.chat, appId, uId, chatId, `${getNanoid(6)}-${filename}`].join('/'); - await MongoS3TTL.create({ - minioKey: rawKey, - bucketName: this.bucket.name, - expiredTime: addHours(new Date(), 24) - }); - return await this.bucket.createPostPresignedUrl({ rawKey, filename }); + return await this.bucket.createPostPresignedUrl({ rawKey, filename }, { expiredHours: 24 }); } deleteChatFilesByPrefix(params: DelChatFileByPrefixParams) { diff --git a/packages/service/common/s3/sources/dataset/index.ts b/packages/service/common/s3/sources/dataset/index.ts new file mode 100644 index 000000000000..a6db3714df29 --- /dev/null +++ b/packages/service/common/s3/sources/dataset/index.ts @@ -0,0 +1,203 @@ +import { S3Sources } from '../../type'; +import { S3PrivateBucket } from '../../buckets/private'; +import { getNanoid, parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; +import { + type CreateGetDatasetFileURLParams, + CreateGetDatasetFileURLParamsSchema, + type CreateUploadDatasetFileParams, + CreateUploadDatasetFileParamsSchema, + type DeleteDatasetFilesByPrefixParams, + DeleteDatasetFilesByPrefixParamsSchema, + type GetDatasetFileContentParams, + GetDatasetFileContentParamsSchema, + type UploadDatasetFileByBufferParams, + UploadDatasetFileByBufferParamsSchema +} from './type'; +import { MongoS3TTL } from '../../schema'; +import { + addDays, + addHours, + addMinutes, + differenceInDays, + differenceInMilliseconds +} from 'date-fns'; +import { addLog } from '../../../system/log'; +import { detectFileEncoding } from '@fastgpt/global/common/file/tools'; +import { readS3FileContentByBuffer } from '../../../file/read/utils'; +import { addRawTextBuffer, getRawTextBuffer } from '../../../buffer/rawText/controller'; +import type { ClientSession } from '../../../mongo'; +import { MongoDatasetData } from '../../../../core/dataset/data/schema'; +import path from 'node:path'; +import { Mimes } from '../../constants'; +import jwt from 'jsonwebtoken'; +import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode'; + +type DatasetObjectKey = `${typeof S3Sources.dataset}/${string}`; + +class S3DatasetSource { + public bucket: S3PrivateBucket; + private static instance: S3DatasetSource; + + constructor() { + this.bucket = new S3PrivateBucket(); + } + + static getInstance() { + return (this.instance ??= new S3DatasetSource()); + } + + // 下载链接 + async createGetDatasetFileURL(params: CreateGetDatasetFileURLParams) { + const { key, expiredHours, external } = CreateGetDatasetFileURLParamsSchema.parse(params); + + if (external) { + return await this.bucket.createExtenalUrl({ key, expiredHours }); + } + return await this.bucket.createPreviewlUrl({ key, expiredHours }); + } + + // 上传链接 + async createUploadDatasetFileURL(params: CreateUploadDatasetFileParams) { + const { filename, datasetId } = CreateUploadDatasetFileParamsSchema.parse(params); + const rawKey = [S3Sources.dataset, datasetId, `${getNanoid(6)}-${filename}`].join('/'); + return await this.bucket.createPostPresignedUrl({ rawKey, filename }, { expiredHours: 3 }); + } + + /** + * 可以根据 datasetId 或者 prefix 删除文件 + * 如果存在 rawPrefix 则优先使用 rawPrefix 去删除文件,否则使用 datasetId 拼接前缀去删除文件 + * 比如根据被解析的文档前缀去删除解析出来的图片 + **/ + deleteDatasetFilesByPrefix(params: DeleteDatasetFilesByPrefixParams) { + const { datasetId, rawPrefix } = DeleteDatasetFilesByPrefixParamsSchema.parse(params); + const prefix = rawPrefix || [S3Sources.dataset, datasetId].filter(Boolean).join('/'); + return this.bucket.addDeleteJob({ prefix }); + } + + // 单个键删除 + deleteDatasetFileByKey(key?: string) { + return this.bucket.addDeleteJob({ key }); + } + + // 多个键删除 + deleteDatasetFilesByKeys(keys: string[]) { + return this.bucket.addDeleteJob({ keys }); + } + + // 获取文件流 + getDatasetFileStream(key: string) { + return this.bucket.getObject(key); + } + + // 获取文件状态 + getDatasetFileStat(key: string) { + return this.bucket.statObject(key); + } + + // 获取文件元数据 + async getFileMetadata(key: string) { + const stat = await this.getDatasetFileStat(key); + if (!stat) return { filename: '', extension: '', contentLength: 0, contentType: '' }; + + const contentLength = stat.size; + const filename: string = decodeURIComponent(stat.metaData['origin-filename']); + const extension = parseFileExtensionFromUrl(filename); + const contentType: string = stat.metaData['content-type']; + return { + filename, + extension, + contentType, + contentLength + }; + } + + isDatasetObjectKey(key?: string): key is DatasetObjectKey { + return typeof key === 'string' && key.startsWith(`${S3Sources.dataset}/`); + } + + async getDatasetBase64Image(key: string): Promise { + const [stream, metadata] = await Promise.all([ + this.getDatasetFileStream(key), + this.getFileMetadata(key) + ]); + const buffer = await this.bucket.fileStreamToBuffer(stream); + const base64 = buffer.toString('base64'); + return `data:${metadata.contentType || 'image/jpeg'};base64,${base64}`; + } + + async getDatasetFileRawText(params: GetDatasetFileContentParams) { + const { fileId, teamId, tmbId, customPdfParse, getFormatText, usageId, datasetId } = + GetDatasetFileContentParamsSchema.parse(params); + + const bufferId = `${fileId}-${customPdfParse}`; + const fileBuffer = await getRawTextBuffer(bufferId); + if (fileBuffer) { + return { + rawText: fileBuffer.text, + filename: fileBuffer.sourceName + }; + } + + const [metadata, stream] = await Promise.all([ + this.getFileMetadata(fileId), + this.getDatasetFileStream(fileId) + ]); + + const extension = metadata.extension; + const filename: string = decodeURIComponent(metadata.filename); + + const start = Date.now(); + const buffer = await this.bucket.fileStreamToBuffer(stream); + addLog.debug('get dataset file buffer', { time: Date.now() - start }); + + const encoding = detectFileEncoding(buffer); + const prefix = `${path.dirname(fileId)}/${path.basename(fileId, path.extname(fileId))}-parsed`; + const { rawText } = await readS3FileContentByBuffer({ + teamId, + tmbId, + extension, + buffer, + encoding, + customPdfParse, + usageId, + getFormatText, + imageKeyOptions: { + prefix: prefix + } + }); + + addRawTextBuffer({ + sourceId: bufferId, + sourceName: filename, + text: rawText, + expiredTime: addMinutes(new Date(), 20) + }); + + return { + rawText, + filename + }; + } + + // 根据文件 Buffer 上传文件 + async uploadDatasetFileByBuffer(params: UploadDatasetFileByBufferParams): Promise { + const { datasetId, buffer, filename } = UploadDatasetFileByBufferParamsSchema.parse(params); + + const key = [S3Sources.dataset, datasetId, `${getNanoid(6)}-${filename}`].join('/'); + await this.bucket.putObject(key, buffer, buffer.length, { + 'content-type': Mimes[path.extname(filename) as keyof typeof Mimes], + 'upload-time': new Date().toISOString(), + 'origin-filename': encodeURIComponent(filename) + }); + await MongoS3TTL.create({ + minioKey: key, + bucketName: this.bucket.name, + expiredTime: addHours(new Date(), 3) + }); + return key; + } +} + +export function getS3DatasetSource() { + return S3DatasetSource.getInstance(); +} diff --git a/packages/service/common/s3/sources/dataset/type.ts b/packages/service/common/s3/sources/dataset/type.ts new file mode 100644 index 000000000000..81fbefa0b4b7 --- /dev/null +++ b/packages/service/common/s3/sources/dataset/type.ts @@ -0,0 +1,54 @@ +import { ObjectIdSchema } from '@fastgpt/global/common/type/mongo'; +import { z } from 'zod'; + +export const CreateUploadDatasetFileParamsSchema = z.object({ + filename: z.string().nonempty(), + datasetId: ObjectIdSchema +}); +export type CreateUploadDatasetFileParams = z.infer; + +export const CreateGetDatasetFileURLParamsSchema = z.object({ + key: z.string().nonempty(), + expiredHours: z.number().positive().optional(), + external: z.boolean().optional() +}); +export type CreateGetDatasetFileURLParams = z.infer; + +export const DeleteDatasetFilesByPrefixParamsSchema = z.object({ + datasetId: ObjectIdSchema.optional(), + rawPrefix: z.string().nonempty().optional() +}); +export type DeleteDatasetFilesByPrefixParams = z.infer< + typeof DeleteDatasetFilesByPrefixParamsSchema +>; + +export const GetDatasetFileContentParamsSchema = z.object({ + teamId: ObjectIdSchema, + tmbId: ObjectIdSchema, + fileId: z.string().nonempty(), // 这是 ObjectKey + customPdfParse: z.boolean().optional(), + getFormatText: z.boolean().optional(), // 数据类型都尽可能转化成 markdown 格式 + datasetId: ObjectIdSchema, + usageId: ObjectIdSchema.optional() +}); +export type GetDatasetFileContentParams = z.infer; + +export const UploadParsedDatasetImagesParamsSchema = z.object({ + key: z.string().nonempty() +}); +export type UploadParsedDatasetImagesParams = z.infer; + +export const ParsedFileContentS3KeyParamsSchema = z.object({ + datasetId: ObjectIdSchema, + mimetype: z.string().nonempty(), + filename: z.string().optional(), + parsedFileKey: z.string().optional() // 被解析的文件的完整 key,作为图片的父目录 +}); +export type ParsedFileContentS3KeyParams = z.infer; + +export const UploadDatasetFileByBufferParamsSchema = z.object({ + datasetId: ObjectIdSchema, + buffer: z.instanceof(Buffer), + filename: z.string().nonempty() +}); +export type UploadDatasetFileByBufferParams = z.infer; diff --git a/packages/service/common/s3/type.ts b/packages/service/common/s3/type.ts index 885a0081742b..48417d3895fc 100644 --- a/packages/service/common/s3/type.ts +++ b/packages/service/common/s3/type.ts @@ -17,7 +17,7 @@ export type ExtensionType = keyof typeof Mimes; export type S3OptionsType = typeof defaultS3Options; -export const S3SourcesSchema = z.enum(['avatar', 'chat']); +export const S3SourcesSchema = z.enum(['avatar', 'chat', 'dataset', 'tmp']); export const S3Sources = S3SourcesSchema.enum; export type S3SourceType = z.infer; @@ -25,13 +25,15 @@ export const CreatePostPresignedUrlParamsSchema = z.union([ // Option 1: Only rawKey z.object({ filename: z.string().min(1), - rawKey: z.string().min(1) + rawKey: z.string().min(1), + metadata: z.record(z.string(), z.string()).optional() }), // Option 2: filename with optional source and teamId z.object({ filename: z.string().min(1), source: S3SourcesSchema.optional(), - teamId: z.string().length(16).optional() + teamId: z.string().length(16).optional(), + metadata: z.record(z.string(), z.string()).optional() }) ]); export type CreatePostPresignedUrlParams = z.infer; @@ -55,6 +57,15 @@ export const CreateGetPresignedUrlParamsSchema = z.object({ }); export type createPreviewUrlParams = z.infer; +export const UploadImage2S3BucketParamsSchema = z.object({ + base64Img: z.string().nonempty(), + uploadKey: z.string().nonempty(), + mimetype: z.string().nonempty(), + filename: z.string().nonempty(), + expiredTime: z.date().optional() +}); +export type UploadImage2S3BucketParams = z.infer; + declare global { var s3BucketMap: { [key: string]: S3BaseBucket; diff --git a/packages/service/common/s3/utils.ts b/packages/service/common/s3/utils.ts new file mode 100644 index 000000000000..ede3d9c8078c --- /dev/null +++ b/packages/service/common/s3/utils.ts @@ -0,0 +1,139 @@ +import jwt from 'jsonwebtoken'; +import { addDays, isAfter, differenceInSeconds } from 'date-fns'; +import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode'; +import type { ClientSession } from 'mongoose'; +import { MongoS3TTL } from './schema'; +import { S3Buckets } from './constants'; +import { S3PrivateBucket } from './buckets/private'; +import { S3Sources, type UploadImage2S3BucketParams } from './type'; +import { S3PublicBucket } from './buckets/public'; +import { getNanoid } from '@fastgpt/global/common/string/tools'; +import path from 'node:path'; +import { randomUUID } from 'node:crypto'; +import type { ParsedFileContentS3KeyParams } from './sources/dataset/type'; + +export function jwtSignS3ObjectKey(objectKey: string) { + const secret = process.env.FILE_TOKEN_KEY as string; + const now = new Date(); + const expiresIn = differenceInSeconds(addDays(now, 90), now); + const token = jwt.sign({ objectKey }, secret, { expiresIn }); + + return token; +} + +export function jwtVerifyS3ObjectKey(token: string) { + const secret = process.env.FILE_TOKEN_KEY as string; + return new Promise<{ objectKey: string }>((resolve, reject) => { + jwt.verify(token, secret, (err, payload) => { + if (err || !payload || !(payload as jwt.JwtPayload).objectKey) { + reject(ERROR_ENUM.unAuthFile); + } + + resolve(payload as { objectKey: string }); + }); + }); +} + +export function removeS3TTL({ + key, + bucketName, + session +}: { + key: string[] | string; + bucketName: keyof typeof S3Buckets; + session?: ClientSession; +}) { + if (!key) return; + + if (Array.isArray(key)) { + return MongoS3TTL.deleteMany( + { + minioKey: { $in: key }, + bucketName: S3Buckets[bucketName] + }, + { session } + ); + } + + if (typeof key === 'string') { + return MongoS3TTL.deleteOne( + { + minioKey: key, + bucketName: S3Buckets[bucketName] + }, + { session } + ); + } +} + +export async function uploadImage2S3Bucket( + bucketName: keyof typeof S3Buckets, + params: UploadImage2S3BucketParams +) { + const { base64Img, filename, mimetype, uploadKey, expiredTime } = params; + + const bucket = bucketName === 'private' ? new S3PrivateBucket() : new S3PublicBucket(); + + const base64Data = base64Img.split(',')[1] || base64Img; + const buffer = Buffer.from(base64Data, 'base64'); + + await bucket.putObject(uploadKey, buffer, buffer.length, { + 'content-type': mimetype, + 'upload-time': new Date().toISOString(), + 'origin-filename': encodeURIComponent(filename) + }); + + const now = new Date(); + if (expiredTime && isAfter(expiredTime, now)) { + await MongoS3TTL.create({ + minioKey: uploadKey, + bucketName: bucket.name, + expiredTime: expiredTime + }); + } + + return uploadKey; +} + +export const ParsedFileContentS3Key = { + // 临时的文件路径(比如 evaluation) + temp: (appId: string) => { + return `${S3Sources.chat}/${appId}/temp/${randomUUID()}`; + }, + + // 对话中上传的文件的解析结果的图片的 Key + chat: ({ appId, chatId, uId }: { chatId: string; uId: string; appId: string }) => { + return `${S3Sources.chat}/${appId}/${uId}/${chatId}/parsed`; + }, + + // 上传数据集的文件的解析结果的图片的 Key + dataset: (params: ParsedFileContentS3KeyParams) => { + const { datasetId, mimetype: ext, filename, parsedFileKey } = params; + + const imageName = (() => { + const id = getNanoid(6); + if (!filename) return `${id}.${ext}`; + return !!path.extname(filename) ? `${id}-${filename}` : `${id}-${filename}.${ext}`; + })(); + + if (!parsedFileKey) { + return { + key: [S3Sources.dataset, datasetId, imageName].join('/'), + filename: imageName + }; + } + + const parsedFileName = parsedFileKey.split('/').at(-1)!; + const parsedContentPrefix = `parsed-${path.basename(parsedFileName, path.extname(parsedFileName))}`; + const parsedContentKey = parsedFileKey + .split('/') + .slice(0, -1) + .concat(parsedContentPrefix) + .join('/'); + + return { + key: parsedContentKey, + filename: imageName + }; + } +}; diff --git a/packages/service/core/ai/llm/request.ts b/packages/service/core/ai/llm/request.ts index 0b2bfd369252..74297520ee46 100644 --- a/packages/service/core/ai/llm/request.ts +++ b/packages/service/core/ai/llm/request.ts @@ -86,7 +86,7 @@ export const createLLMResponse = async ( messages: rewriteMessages }); - // console.dir(requestBody, { depth: null }); + // console.log(JSON.stringify(requestBody, null, 2)); const { response, isStreamResponse, getEmptyResponseTip } = await createChatCompletion({ body: requestBody, userKey, diff --git a/packages/service/core/ai/llm/utils.ts b/packages/service/core/ai/llm/utils.ts index a1988215f064..7e9f645e808d 100644 --- a/packages/service/core/ai/llm/utils.ts +++ b/packages/service/core/ai/llm/utils.ts @@ -14,6 +14,12 @@ import { addLog } from '../../../common/system/log'; import { getImageBase64 } from '../../../common/file/image/utils'; import { getS3ChatSource } from '../../../common/s3/sources/chat'; import { isInternalAddress } from '../../../common/system/utils'; +import { S3Sources } from '../../../common/s3/type'; +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; +import { getGlobalRedisConnection } from '../../../common/redis'; +import { randomUUID } from 'node:crypto'; +import { TempFileURL } from '@fastgpt/global/common/file/image/constants'; +import { EndpointUrl } from '@fastgpt/global/common/file/constants'; export const filterGPTMessageByMaxContext = async ({ messages = [], @@ -127,7 +133,7 @@ export const loadRequestMessages = async ({ const result: ChatCompletionContentPart[] = []; // 提取所有HTTPS图片URL并添加到result开头 - const httpsImages = [...new Set(Array.from(input.matchAll(imageRegex), (m) => m[0]))]; + const httpsImages = Array.from(new Set(input.matchAll(imageRegex)), (m) => m[0]); httpsImages.forEach((url) => { result.push({ type: 'image_url', @@ -379,7 +385,10 @@ export const loadRequestMessages = async ({ return { ...item, - content: formatContent + content: + typeof formatContent === 'string' + ? formatContent + : (formatContent as ChatCompletionContentPartText[]) }; } else if (item.role === ChatCompletionRequestMessageRoleEnum.Assistant) { if (item.tool_calls || item.function_call) { diff --git a/packages/service/core/app/controller.ts b/packages/service/core/app/controller.ts index bd8ec22386cd..bd085bc62e92 100644 --- a/packages/service/core/app/controller.ts +++ b/packages/service/core/app/controller.ts @@ -28,6 +28,7 @@ import { mongoSessionRun } from '../../common/mongo/sessionRun'; import { MongoAppLogKeys } from './logs/logkeysSchema'; import { MongoChatItemResponse } from '../chat/chatItemResponseSchema'; import { getS3ChatSource } from '../../common/s3/sources/chat'; +import { getS3AvatarSource } from '../../common/s3/sources/avatar'; export const beforeUpdateAppFormat = ({ nodes }: { nodes?: StoreNodeItemType[] }) => { if (!nodes) return; diff --git a/packages/service/core/chat/saveChat.ts b/packages/service/core/chat/saveChat.ts index ad5f35ca0081..5c73f4ec12b8 100644 --- a/packages/service/core/chat/saveChat.ts +++ b/packages/service/core/chat/saveChat.ts @@ -17,8 +17,8 @@ import { MongoAppChatLog } from '../app/logs/chatLogsSchema'; import { writePrimary } from '../../common/mongo/utils'; import { MongoChatItemResponse } from './chatItemResponseSchema'; import { chatValue2RuntimePrompt } from '@fastgpt/global/core/chat/adapt'; -import { MongoS3TTL } from '../../common/s3/schema'; import type { ClientSession } from '../../common/mongo'; +import { removeS3TTL } from '../../common/s3/utils'; type Props = { chatId: string; @@ -56,7 +56,6 @@ const afterProcess = async ({ contents: (UserChatItemType | AIChatItemType)[]; session: ClientSession; }) => { - // Remove ttl const fileKeys = contents .map((item) => { if (item.value && Array.isArray(item.value)) { @@ -70,8 +69,9 @@ const afterProcess = async ({ }) .flat() .filter(Boolean) as string[]; + if (fileKeys.length > 0) { - await MongoS3TTL.deleteMany({ minioKey: { $in: fileKeys } }, { session }); + await removeS3TTL({ key: fileKeys, bucketName: 'private', session }); } }; diff --git a/packages/service/core/dataset/apiDataset/custom/api.ts b/packages/service/core/dataset/apiDataset/custom/api.ts index 277ca4eb93b9..1af2ffd8134d 100644 --- a/packages/service/core/dataset/apiDataset/custom/api.ts +++ b/packages/service/core/dataset/apiDataset/custom/api.ts @@ -126,12 +126,14 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer } teamId, tmbId, apiFileId, - customPdfParse + customPdfParse, + datasetId }: { teamId: string; tmbId: string; apiFileId: string; customPdfParse?: boolean; + datasetId: string; }): Promise => { const data = await request< { @@ -161,11 +163,12 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer } }; } - const rawText = await readFileRawTextByUrl({ + const { rawText } = await readFileRawTextByUrl({ teamId, tmbId, url: previewUrl, relatedId: apiFileId, + datasetId, customPdfParse, getFormatText: true }); diff --git a/packages/service/core/dataset/collection/controller.ts b/packages/service/core/dataset/collection/controller.ts index 2ff9673099f1..ea8c24ea39ca 100644 --- a/packages/service/core/dataset/collection/controller.ts +++ b/packages/service/core/dataset/collection/controller.ts @@ -1,4 +1,7 @@ -import { DatasetCollectionDataProcessModeEnum } from '@fastgpt/global/core/dataset/constants'; +import { + DatasetCollectionDataProcessModeEnum, + DatasetCollectionTypeEnum +} from '@fastgpt/global/core/dataset/constants'; import type { CreateDatasetCollectionParams } from '@fastgpt/global/core/dataset/api.d'; import { MongoDatasetCollection } from './schema'; import type { @@ -30,7 +33,10 @@ import { getLLMMaxChunkSize } from '@fastgpt/global/core/dataset/training/utils'; import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/constants'; -import { clearCollectionImages, removeDatasetImageExpiredTime } from '../image/utils'; +import { clearCollectionImages } from '../image/utils'; +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; +import path from 'node:path'; +import { removeS3TTL } from '../../../common/s3/utils'; export const createCollectionAndInsertData = async ({ dataset, @@ -232,13 +238,6 @@ export const createCollectionAndInsertData = async ({ } })(); - // 6. Remove images ttl index - await removeDatasetImageExpiredTime({ - ids: imageIds, - collectionId, - session - }); - return { collectionId: String(collectionId), insertResults @@ -300,6 +299,10 @@ export async function createOneCollection({ session, ...props }: CreateOneCollec { session, ordered: true } ); + if (getS3DatasetSource().isDatasetObjectKey(fileId)) { + await removeS3TTL({ key: fileId, bucketName: 'private', session }); + } + return collection; } @@ -363,9 +366,25 @@ export async function delCollection({ if (!teamId) return Promise.reject('teamId is not exist'); + const s3DatasetSource = getS3DatasetSource(); const datasetIds = Array.from(new Set(collections.map((item) => String(item.datasetId)))); const collectionIds = collections.map((item) => String(item._id)); + const imageCollectionIds = collections + .filter((item) => item.type === DatasetCollectionTypeEnum.images) + .map((item) => String(item._id)); + const imageDatas = await MongoDatasetData.find( + { + teamId, + datasetId: { $in: datasetIds }, + collectionId: { $in: imageCollectionIds } + }, + { imageId: 1 } + ).lean(); + const imageIds = imageDatas + .map((item) => item.imageId) + .filter((key) => s3DatasetSource.isDatasetObjectKey(key)); + await retryFn(async () => { await Promise.all([ // Delete training data @@ -419,6 +438,17 @@ export async function delCollection({ _id: { $in: collectionIds } }, { session } - ); + ).lean(); + + // delete s3 images which are parsed from docs + // Delete all images parsed from the document by prefix + collections + .map((item) => item.fileId) + .filter((fileId): fileId is string => !!fileId && s3DatasetSource.isDatasetObjectKey(fileId)) + .map((key) => `${path.dirname(key)}/${path.basename(key, path.extname(key))}-parsed`) + .forEach((prefix) => s3DatasetSource.deleteDatasetFilesByPrefix({ rawPrefix: prefix })); + + // delete s3 images which are uploaded by users + await s3DatasetSource.deleteDatasetFilesByKeys(imageIds); }); } diff --git a/packages/service/core/dataset/collection/schema.ts b/packages/service/core/dataset/collection/schema.ts index 61ddc48a099b..f2281a6f217e 100644 --- a/packages/service/core/dataset/collection/schema.ts +++ b/packages/service/core/dataset/collection/schema.ts @@ -58,10 +58,8 @@ const DatasetCollectionSchema = new Schema({ // Metadata // local file collection - fileId: { - type: Schema.Types.ObjectId, - ref: 'dataset.files' - }, + // Support both GridFS ObjectId (string) and S3 key (string) + fileId: String, // web link collection rawLink: String, // Api collection @@ -72,6 +70,7 @@ const DatasetCollectionSchema = new Schema({ rawTextLength: Number, hashRawText: String, + metadata: { type: Object, default: {} diff --git a/packages/service/core/dataset/collection/utils.ts b/packages/service/core/dataset/collection/utils.ts index e2334ef0b3a9..5573b4957c85 100644 --- a/packages/service/core/dataset/collection/utils.ts +++ b/packages/service/core/dataset/collection/utils.ts @@ -164,6 +164,7 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => { const { title, rawText } = await readDatasetSourceRawText({ teamId: collection.teamId, tmbId: collection.tmbId, + datasetId: collection.datasetId, ...sourceReadType }); @@ -208,7 +209,7 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => { return DatasetCollectionSyncResultEnum.sameRaw; }; -/* +/* QA: 独立进程 Chunk: Image Index -> Auto index -> chunk index */ diff --git a/packages/service/core/dataset/controller.ts b/packages/service/core/dataset/controller.ts index dade64db9214..1f52fe2daa8f 100644 --- a/packages/service/core/dataset/controller.ts +++ b/packages/service/core/dataset/controller.ts @@ -15,6 +15,8 @@ import { removeDatasetSyncJobScheduler } from './datasetSync'; import { mongoSessionRun } from '../../common/mongo/sessionRun'; import { removeImageByPath } from '../../common/file/image/controller'; import { UserError } from '@fastgpt/global/common/error/utils'; +import { getS3DatasetSource } from '../../common/s3/sources/dataset'; +import { getS3AvatarSource } from '../../common/s3/sources/avatar'; /* ============= dataset ========== */ /* find all datasetId by top datasetId */ @@ -122,6 +124,10 @@ export async function delDatasetRelevantData({ teamId, datasetId: { $in: datasetIds } }).session(session); + + for (const datasetId of datasetIds) { + await getS3DatasetSource().deleteDatasetFilesByPrefix({ datasetId }); + } } export const deleteDatasets = async ({ diff --git a/packages/service/core/dataset/data/controller.ts b/packages/service/core/dataset/data/controller.ts index 50c70500797d..38dd004c0d2a 100644 --- a/packages/service/core/dataset/data/controller.ts +++ b/packages/service/core/dataset/data/controller.ts @@ -1,6 +1,8 @@ +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; import { addEndpointToImageUrl } from '../../../common/file/image/utils'; import { getDatasetImagePreviewUrl } from '../image/utils'; -import type { DatasetCiteItemType, DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type'; +import type { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type'; +import { getS3ChatSource } from '../../../common/s3/sources/chat'; export const formatDatasetDataValue = ({ teamId, @@ -56,12 +58,15 @@ export const formatDatasetDataValue = ({ }; } - const previewUrl = getDatasetImagePreviewUrl({ - imageId, - teamId, - datasetId, - expiredMinutes: 60 * 24 * 7 // 7 days - }); + const previewUrl = + getS3DatasetSource().isDatasetObjectKey(imageId) || getS3ChatSource().isChatFileKey(imageId) + ? imageId + : getDatasetImagePreviewUrl({ + imageId, + teamId, + datasetId, + expiredMinutes: 60 * 24 * 7 // 7 days + }); return { q: `![${q.replaceAll('\n', '')}](${previewUrl})`, @@ -71,7 +76,7 @@ export const formatDatasetDataValue = ({ }; export const getFormatDatasetCiteList = (list: DatasetDataSchemaType[]) => { - return list.map((item) => ({ + return list.map((item) => ({ _id: item._id, ...formatDatasetDataValue({ teamId: item.teamId, diff --git a/packages/service/core/dataset/image/controller.ts b/packages/service/core/dataset/image/controller.ts index ca597eae44b5..d8cd7882d1e8 100644 --- a/packages/service/core/dataset/image/controller.ts +++ b/packages/service/core/dataset/image/controller.ts @@ -10,6 +10,7 @@ import { checkTimerLock } from '../../../common/system/timerLock/utils'; import { TimerIdEnum } from '../../../common/system/timerLock/constants'; import { addLog } from '../../../common/system/log'; import { UserError } from '@fastgpt/global/common/error/utils'; +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; const getGridBucket = () => { return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, { @@ -114,9 +115,14 @@ export const getDatasetImageBase64 = async (imageId: string) => { export const deleteDatasetImage = async (imageId: string) => { const gridBucket = getGridBucket(); + const s3DatasetSource = getS3DatasetSource(); try { - await gridBucket.delete(new Types.ObjectId(imageId)); + if (s3DatasetSource.isDatasetObjectKey(imageId)) { + await s3DatasetSource.deleteDatasetFileByKey(imageId); + } else { + await gridBucket.delete(new Types.ObjectId(imageId)); + } } catch (error: any) { const msg = error?.message; if (msg.includes('File not found')) { diff --git a/packages/service/core/dataset/read.ts b/packages/service/core/dataset/read.ts index b17efe205c74..99dbf3c8aeb6 100644 --- a/packages/service/core/dataset/read.ts +++ b/packages/service/core/dataset/read.ts @@ -1,13 +1,11 @@ -import { BucketNameEnum } from '@fastgpt/global/common/file/constants'; import { ChunkTriggerConfigTypeEnum, DatasetSourceReadTypeEnum } from '@fastgpt/global/core/dataset/constants'; -import { readFileContentFromMongo } from '../../common/file/gridfs/controller'; import { urlsFetch } from '../../common/string/cheerio'; import { type TextSplitProps } from '@fastgpt/global/common/string/textSplitter'; import axios from 'axios'; -import { readRawContentByFileBuffer } from '../../common/file/read/utils'; +import { readS3FileContentByBuffer } from '../../common/file/read/utils'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { getApiDatasetRequest } from './apiDataset'; import Papa from 'papaparse'; @@ -17,6 +15,10 @@ import { addLog } from '../../common/system/log'; import { retryFn } from '@fastgpt/global/common/system/utils'; import { getFileMaxSize } from '../../common/file/utils'; import { UserError } from '@fastgpt/global/common/error/utils'; +import { getS3DatasetSource } from '../../common/s3/sources/dataset'; +import { Mimes } from '../../common/s3/constants'; +import path from 'node:path'; +import { ParsedFileContentS3Key } from '../../common/s3/utils'; export const readFileRawTextByUrl = async ({ teamId, @@ -25,6 +27,7 @@ export const readFileRawTextByUrl = async ({ customPdfParse, getFormatText, relatedId, + datasetId, maxFileSize = getFileMaxSize() }: { teamId: string; @@ -33,6 +36,7 @@ export const readFileRawTextByUrl = async ({ customPdfParse?: boolean; getFormatText?: boolean; relatedId: string; // externalFileId / apiFileId + datasetId: string; maxFileSize?: number; }) => { const extension = parseFileExtensionFromUrl(url); @@ -64,7 +68,7 @@ export const readFileRawTextByUrl = async ({ const chunks: Buffer[] = []; let totalLength = 0; - return new Promise((resolve, reject) => { + return new Promise<{ rawText: string }>((resolve, reject) => { let isAborted = false; const cleanup = () => { @@ -107,8 +111,14 @@ export const readFileRawTextByUrl = async ({ // 立即清理 chunks 数组释放内存 chunks.length = 0; - const { rawText } = await retryFn(() => - readRawContentByFileBuffer({ + const { rawText } = await retryFn(() => { + const { key } = ParsedFileContentS3Key.dataset({ + datasetId, + filename: 'file', + mimetype: Mimes[extension as keyof typeof Mimes] + }); + const prefix = `${path.dirname(key)}/${path.basename(key, path.extname(key))}-parsed`; + return readS3FileContentByBuffer({ customPdfParse, getFormatText, extension, @@ -116,13 +126,13 @@ export const readFileRawTextByUrl = async ({ tmbId, buffer, encoding: 'utf-8', - metadata: { - relatedId + imageKeyOptions: { + prefix: prefix } - }) - ); + }); + }); - resolve(rawText); + resolve({ rawText }); } catch (error) { cleanup(); reject(error); @@ -142,7 +152,7 @@ export const readFileRawTextByUrl = async ({ }); }; -/* +/* fileId - local file, read from mongo link - request externalFile/apiFile = request read @@ -157,7 +167,8 @@ export const readDatasetSourceRawText = async ({ apiDatasetServer, customPdfParse, getFormatText, - usageId + usageId, + datasetId }: { teamId: string; tmbId: string; @@ -170,20 +181,26 @@ export const readDatasetSourceRawText = async ({ externalFileId?: string; // external file dataset apiDatasetServer?: ApiDatasetServerType; // api dataset usageId?: string; + datasetId: string; // For S3 image upload }): Promise<{ title?: string; rawText: string; }> => { if (type === DatasetSourceReadTypeEnum.fileLocal) { - const { filename, rawText } = await readFileContentFromMongo({ + if (!datasetId || !getS3DatasetSource().isDatasetObjectKey(sourceId)) { + return Promise.reject('datasetId is required for S3 files'); + } + + const { filename, rawText } = await getS3DatasetSource().getDatasetFileRawText({ teamId, tmbId, - bucketName: BucketNameEnum.dataset, fileId: sourceId, getFormatText, customPdfParse, - usageId + usageId, + datasetId }); + return { title: filename, rawText @@ -205,11 +222,12 @@ export const readDatasetSourceRawText = async ({ }; } else if (type === DatasetSourceReadTypeEnum.externalFile) { if (!externalFileId) return Promise.reject(new UserError('FileId not found')); - const rawText = await readFileRawTextByUrl({ + const { rawText } = await readFileRawTextByUrl({ teamId, tmbId, url: sourceId, relatedId: externalFileId, + datasetId, customPdfParse }); return { @@ -221,7 +239,8 @@ export const readDatasetSourceRawText = async ({ apiFileId: sourceId, teamId, tmbId, - customPdfParse + customPdfParse, + datasetId }); return { title, @@ -239,13 +258,15 @@ export const readApiServerFileContent = async ({ apiFileId, teamId, tmbId, - customPdfParse + customPdfParse, + datasetId }: { apiDatasetServer?: ApiDatasetServerType; apiFileId: string; teamId: string; tmbId: string; customPdfParse?: boolean; + datasetId: string; }): Promise<{ title?: string; rawText: string; @@ -254,7 +275,8 @@ export const readApiServerFileContent = async ({ teamId, tmbId, apiFileId, - customPdfParse + customPdfParse, + datasetId }); }; @@ -285,7 +307,6 @@ export const rawText2Chunks = async ({ > => { const parseDatasetBackup2Chunks = (rawText: string) => { const csvArr = Papa.parse(rawText).data as string[][]; - const chunks = csvArr .slice(1) .map((item) => ({ diff --git a/packages/service/core/dataset/utils.ts b/packages/service/core/dataset/utils.ts index 1bbcd5168b78..f8cdd6ab2f06 100644 --- a/packages/service/core/dataset/utils.ts +++ b/packages/service/core/dataset/utils.ts @@ -1,5 +1,11 @@ +import { MongoDatasetData } from './data/schema'; import { authDatasetByTmbId } from '../../support/permission/dataset/auth'; import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant'; +import { S3Sources } from '../../common/s3/type'; +import { getS3DatasetSource } from '../../common/s3/sources/dataset'; +import { getS3ChatSource } from '../../common/s3/sources/chat'; +import { EndpointUrl } from '@fastgpt/global/common/file/constants'; +import { jwtSignS3ObjectKey } from '../../common/s3/utils'; // TODO: 需要优化成批量获取权限 export const filterDatasetsByTmbId = async ({ @@ -28,3 +34,46 @@ export const filterDatasetsByTmbId = async ({ // Then filter datasetIds based on permissions return datasetIds.filter((_, index) => permissions[index]); }; + +/** + * 替换数据集引用 markdown 文本中的图片链接格式的 S3 对象键为 JWT 签名后的 URL + * + * @param datasetQuoteText 数据集引用文本 + * @returns 替换后的文本 + * + * @example + * + * ```typescript + * const datasetQuoteText = '![image.png](dataset/68fee42e1d416bb5ddc85b19/6901c3071ba2bea567e8d8db/aZos7D-214afce5-4d42-4356-9e05-8164d51c59ae.png)'; + * const replacedText = await replaceDatasetQuoteTextWithJWT(datasetQuoteText) + * console.log(replacedText) + * // '![image.png](http://localhost:3000/api/system/file/eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvYmplY3RLZXkiOiJjaGF0LzY5MWFlMjlkNDA0ZDA0Njg3MTdkZDc0Ny82OGFkODVhNzQ2MzAwNmM5NjM3OTlhMDcvalhmWHk4eWZHQUZzOVdKcGNXUmJBaFYyL3BhcnNlZC85YTBmNGZlZC00ZWRmLTQ2MTMtYThkNi01MzNhZjVhZTUxZGMucG5nIiwiaWF0IjoxNzYzMzcwOTYwLCJleHAiOjk1MzkzNzA5NjB9.tMDWg0-ZWRnWPNp9Hakd0w1hhaO8jj2oD98SU0wAQYQ)' + * ``` + */ +export async function replaceDatasetQuoteTextWithJWT(datasetQuoteText: string) { + if (!datasetQuoteText || typeof datasetQuoteText !== 'string') return datasetQuoteText as string; + + const prefixPattern = Object.values(S3Sources) + .map((pattern) => `${pattern}\\/[^\\s)]+`) + .join('|'); + const regex = new RegExp(String.raw`(!?)\[([^\]]+)\]\((?!https?:\/\/)(${prefixPattern})\)`, 'g'); + const s3DatasetSource = getS3DatasetSource(); + const s3ChatSource = getS3ChatSource(); + + const matches = Array.from(datasetQuoteText.matchAll(regex)); + let content = datasetQuoteText; + + for (const match of matches.slice().reverse()) { + const [full, bang, alt, objectKey] = match; + + if (s3DatasetSource.isDatasetObjectKey(objectKey) || s3ChatSource.isChatFileKey(objectKey)) { + const token = jwtSignS3ObjectKey(objectKey); + const url = `${EndpointUrl}/api/system/file/${token}`; + const replacement = `${bang}[${alt}](${url})`; + content = + content.slice(0, match.index) + replacement + content.slice(match.index + full.length); + } + } + + return content; +} diff --git a/packages/service/core/workflow/dispatch/ai/chat.ts b/packages/service/core/workflow/dispatch/ai/chat.ts index c6cf2d37f767..26293bb8d7ae 100644 --- a/packages/service/core/workflow/dispatch/ai/chat.ts +++ b/packages/service/core/workflow/dispatch/ai/chat.ts @@ -41,6 +41,8 @@ import { i18nT } from '../../../../../web/i18n/utils'; import { postTextCensor } from '../../../chat/postTextCensor'; import { createLLMResponse } from '../../../ai/llm/request'; import { formatModelChars2Points } from '../../../../support/wallet/usage/utils'; +import { replaceDatasetQuoteTextWithJWT } from '../../../dataset/utils'; +import { ParsedFileContentS3Key } from '../../../../common/s3/utils'; export type ChatProps = ModuleDispatchProps< AIChatNodeProps & { @@ -98,6 +100,7 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise if (stringQuoteText) { @@ -366,7 +376,8 @@ async function getMultiInput({ teamId: runningUserInfo.teamId, tmbId: runningUserInfo.tmbId, customPdfParse, - usageId + usageId, + fileS3Prefix }); return { diff --git a/packages/service/core/workflow/dispatch/ai/tool/index.ts b/packages/service/core/workflow/dispatch/ai/tool/index.ts index 4a79ba0bde10..a6ddda52839a 100644 --- a/packages/service/core/workflow/dispatch/ai/tool/index.ts +++ b/packages/service/core/workflow/dispatch/ai/tool/index.ts @@ -119,7 +119,10 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise< fileLinks, inputFiles: globalFiles, hasReadFilesTool, - usageId + usageId, + appId: props.runningAppInfo.id, + chatId: props.chatId, + uId: props.uid }); const concatenateSystemPrompt = [ @@ -277,7 +280,10 @@ const getMultiInput = async ({ customPdfParse, inputFiles, hasReadFilesTool, - usageId + usageId, + appId, + chatId, + uId }: { runningUserInfo: ChatDispatchProps['runningUserInfo']; histories: ChatItemType[]; @@ -288,6 +294,9 @@ const getMultiInput = async ({ inputFiles: UserChatItemValueItemType['file'][]; hasReadFilesTool: boolean; usageId?: string; + appId: string; + chatId?: string; + uId: string; }) => { // Not file quote if (!fileLinks || hasReadFilesTool) { @@ -316,7 +325,10 @@ const getMultiInput = async ({ customPdfParse, usageId, teamId: runningUserInfo.teamId, - tmbId: runningUserInfo.tmbId + tmbId: runningUserInfo.tmbId, + appId, + chatId, + uId }); return { diff --git a/packages/service/core/workflow/dispatch/tools/readFiles.ts b/packages/service/core/workflow/dispatch/tools/readFiles.ts index 4726b7b045bf..fe97de4e1773 100644 --- a/packages/service/core/workflow/dispatch/tools/readFiles.ts +++ b/packages/service/core/workflow/dispatch/tools/readFiles.ts @@ -7,7 +7,7 @@ import axios from 'axios'; import { serverRequestBaseUrl } from '../../../../common/api/serverRequest'; import { getErrText } from '@fastgpt/global/common/error/utils'; import { detectFileEncoding, parseUrlToFileType } from '@fastgpt/global/common/file/tools'; -import { readRawContentByFileBuffer } from '../../../../common/file/read/utils'; +import { readS3FileContentByBuffer } from '../../../../common/file/read/utils'; import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants'; import { type ChatItemType, type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; @@ -16,6 +16,8 @@ import { addRawTextBuffer, getRawTextBuffer } from '../../../../common/buffer/ra import { addMinutes } from 'date-fns'; import { getNodeErrResponse } from '../utils'; import { isInternalAddress } from '../../../../common/system/utils'; +import { replaceDatasetQuoteTextWithJWT } from '../../../dataset/utils'; +import { ParsedFileContentS3Key } from '../../../../common/s3/utils'; type Props = ModuleDispatchProps<{ [NodeInputKeyEnum.fileUrlList]: string[]; @@ -71,7 +73,12 @@ export const dispatchReadFiles = async (props: Props): Promise => { teamId, tmbId, customPdfParse, - usageId + usageId, + fileS3Prefix: ParsedFileContentS3Key.chat({ + appId: props.runningAppInfo.id, + chatId: props.chatId!, + uId: props.uid + }) }); return { @@ -124,7 +131,8 @@ export const getFileContentFromLinks = async ({ teamId, tmbId, customPdfParse, - usageId + usageId, + fileS3Prefix }: { urls: string[]; requestOrigin?: string; @@ -133,6 +141,7 @@ export const getFileContentFromLinks = async ({ tmbId: string; customPdfParse?: boolean; usageId?: string; + fileS3Prefix: string; }) => { const parseUrlList = urls // Remove invalid urls @@ -223,8 +232,7 @@ export const getFileContentFromLinks = async ({ return detectFileEncoding(buffer); })(); - // Read file - const { rawText } = await readRawContentByFileBuffer({ + const { rawText } = await readS3FileContentByBuffer({ extension, teamId, tmbId, @@ -232,18 +240,23 @@ export const getFileContentFromLinks = async ({ encoding, customPdfParse, getFormatText: true, + imageKeyOptions: { + prefix: fileS3Prefix + }, usageId }); + const replacedText = await replaceDatasetQuoteTextWithJWT(rawText); + // Add to buffer addRawTextBuffer({ sourceId: url, sourceName: filename, - text: rawText, + text: replacedText, expiredTime: addMinutes(new Date(), 20) }); - return formatResponseObject({ filename, url, content: rawText }); + return formatResponseObject({ filename, url, content: replacedText }); } catch (error) { return formatResponseObject({ filename: '', diff --git a/packages/service/support/permission/auth/file.ts b/packages/service/support/permission/auth/file.ts index bc4d868072a4..3f507dd4f79a 100644 --- a/packages/service/support/permission/auth/file.ts +++ b/packages/service/support/permission/auth/file.ts @@ -10,6 +10,8 @@ import { addMinutes } from 'date-fns'; import { parseHeaderCert } from './common'; import jwt from 'jsonwebtoken'; import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode'; +import { S3Sources } from '../../../common/s3/type'; +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; export const authCollectionFile = async ({ fileId, @@ -17,28 +19,24 @@ export const authCollectionFile = async ({ ...props }: AuthModeType & { fileId: string; -}): Promise< - AuthResponseType & { - file: DatasetFileSchema; - } -> => { +}): Promise => { const authRes = await parseHeaderCert(props); const { teamId, tmbId } = authRes; - const file = await getFileById({ bucketName: BucketNameEnum.dataset, fileId }); - - if (!file) { - return Promise.reject(CommonErrEnum.fileNotFound); - } - - if (file.metadata?.teamId !== teamId) { - return Promise.reject(CommonErrEnum.unAuthFile); + if (fileId.startsWith(S3Sources.dataset)) { + const stat = await getS3DatasetSource().getDatasetFileStat(fileId); + if (!stat) return Promise.reject(CommonErrEnum.fileNotFound); + } else { + const file = await getFileById({ bucketName: BucketNameEnum.dataset, fileId }); + if (!file) { + return Promise.reject(CommonErrEnum.fileNotFound); + } + if (file.metadata?.teamId !== teamId) { + return Promise.reject(CommonErrEnum.unAuthFile); + } } - const permission = new Permission({ - role: ReadRoleVal, - isOwner: file.metadata?.uid === tmbId || file.metadata?.tmbId === tmbId - }); + const permission = new Permission({ role: ReadRoleVal, isOwner: true }); if (!permission.checkPer(per)) { return Promise.reject(CommonErrEnum.unAuthFile); @@ -46,8 +44,7 @@ export const authCollectionFile = async ({ return { ...authRes, - permission, - file + permission }; }; diff --git a/packages/service/support/permission/dataset/auth.ts b/packages/service/support/permission/dataset/auth.ts index faa53dd13683..47d61405af49 100644 --- a/packages/service/support/permission/dataset/auth.ts +++ b/packages/service/support/permission/dataset/auth.ts @@ -19,11 +19,11 @@ import { MongoDatasetData } from '../../../core/dataset/data/schema'; import { type AuthModeType, type AuthResponseType } from '../type'; import { DatasetTypeEnum } from '@fastgpt/global/core/dataset/constants'; import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type'; -import { DataSetDefaultRoleVal } from '@fastgpt/global/support/permission/dataset/constant'; import { getDatasetImagePreviewUrl } from '../../../core/dataset/image/utils'; import { i18nT } from '../../../../web/i18n/utils'; import { parseHeaderCert } from '../auth/common'; import { sumPer } from '@fastgpt/global/support/permission/utils'; +import { getS3DatasetSource } from '../../../common/s3/sources/dataset'; export const authDatasetByTmbId = async ({ tmbId, @@ -242,6 +242,7 @@ export async function authDatasetData({ collectionId: datasetData.collectionId }); + const s3DatasetSource = getS3DatasetSource(); const data: DatasetDataItemType = { id: String(datasetData._id), teamId: datasetData.teamId, @@ -250,12 +251,17 @@ export async function authDatasetData({ a: datasetData.a, imageId: datasetData.imageId, imagePreivewUrl: datasetData.imageId - ? getDatasetImagePreviewUrl({ - imageId: datasetData.imageId, - teamId: datasetData.teamId, - datasetId: datasetData.datasetId, - expiredMinutes: 30 - }) + ? s3DatasetSource.isDatasetObjectKey(datasetData.imageId) + ? await s3DatasetSource.createGetDatasetFileURL({ + key: datasetData.imageId, + expiredHours: 1 + }) + : getDatasetImagePreviewUrl({ + imageId: datasetData.imageId, + teamId: datasetData.teamId, + datasetId: datasetData.datasetId, + expiredMinutes: 30 + }) : undefined, chunkIndex: datasetData.chunkIndex, indexes: datasetData.indexes, diff --git a/packages/web/i18n/en/app.json b/packages/web/i18n/en/app.json index 81aab5a5388b..aa51a6086523 100644 --- a/packages/web/i18n/en/app.json +++ b/packages/web/i18n/en/app.json @@ -153,7 +153,7 @@ "file_recover": "File will overwrite current content", "file_types": "Optional file types", "file_upload": "File Upload", - "file_upload_tip": "Once enabled, documents/images can be uploaded. Documents are retained for 7 days, images for 15 days. Using this feature may incur additional costs. To ensure a good experience, please choose an AI model with a larger context length when using this feature.", + "file_upload_tip": "Once enabled, you can configure the types of files that users can upload. Files are saved along with the conversation; deleting the conversation or deleting the application will clear the files. To ensure a good user experience, please select an AI model with a longer context length when using it.", "find_more_tools": "Explore more", "go_to_chat": "Go to Conversation", "go_to_run": "Go to Execution", diff --git a/packages/web/i18n/en/chat.json b/packages/web/i18n/en/chat.json index 357f99659bcb..4cbc5b08a3b0 100644 --- a/packages/web/i18n/en/chat.json +++ b/packages/web/i18n/en/chat.json @@ -75,6 +75,7 @@ "query_extension_result": "Problem optimization results", "question_tip": "From top to bottom, the response order of each module", "read_raw_source": "Open the original text", + "images_collection_not_supported": "Image collection is not supported open the original file", "reasoning_text": "Thinking process", "release_cancel": "Release Cancel", "release_send": "Release send, slide up to cancel", diff --git a/packages/web/i18n/zh-CN/app.json b/packages/web/i18n/zh-CN/app.json index bc6a354fcfcf..abaecff82ee6 100644 --- a/packages/web/i18n/zh-CN/app.json +++ b/packages/web/i18n/zh-CN/app.json @@ -157,7 +157,7 @@ "file_recover": "文件将覆盖当前内容", "file_types": "可选文件类型", "file_upload": "文件上传", - "file_upload_tip": "开启后,可以上传文档/图片。文档保留7天,图片保留15天。使用该功能可能产生较多额外费用。为保证使用体验,使用该功能时,请选择上下文长度较大的AI模型。", + "file_upload_tip": "开启后,可以配置用户可上传的文件类型。文件跟随对话保存,删除对话或删除应用均会清理文件。为保证使用体验,使用时请选择上下文长度较大的AI模型。", "find_more_tools": "探索更多", "go_to_chat": "去对话", "go_to_run": "去运行", diff --git a/packages/web/i18n/zh-CN/chat.json b/packages/web/i18n/zh-CN/chat.json index 3117883f7da9..23ed352f7c60 100644 --- a/packages/web/i18n/zh-CN/chat.json +++ b/packages/web/i18n/zh-CN/chat.json @@ -75,6 +75,7 @@ "query_extension_result": "问题优化结果", "question_tip": "从上到下,为各个模块的响应顺序", "read_raw_source": "打开原文", + "images_collection_not_supported": "图片数据集不支持打开原文", "reasoning_text": "思考过程", "release_cancel": "松开取消", "release_send": "松开发送,上滑取消", diff --git a/packages/web/i18n/zh-Hant/app.json b/packages/web/i18n/zh-Hant/app.json index b2fc705c5909..8b0fa393a98f 100644 --- a/packages/web/i18n/zh-Hant/app.json +++ b/packages/web/i18n/zh-Hant/app.json @@ -152,7 +152,7 @@ "file_recover": "檔案將會覆蓋目前內容", "file_types": "可選文件類型", "file_upload": "檔案上傳", - "file_upload_tip": "開啟後,可以上傳文件/圖片。文件保留 7 天,圖片保留 15 天。使用這個功能可能產生較多額外費用。為了確保使用體驗,使用這個功能時,請選擇上下文長度較大的 AI 模型。", + "file_upload_tip": "開啟後,可以設定使用者可上傳的檔案類型。檔案跟隨對話儲存,刪除對話或刪除應用程式均會清理檔案。為保證使用體驗,使用時請選擇上下文長度較大的AI模型。", "find_more_tools": "探索更多", "go_to_chat": "前往對話", "go_to_run": "前往執行", diff --git a/packages/web/i18n/zh-Hant/chat.json b/packages/web/i18n/zh-Hant/chat.json index 950ce7bee80b..5abb70e67565 100644 --- a/packages/web/i18n/zh-Hant/chat.json +++ b/packages/web/i18n/zh-Hant/chat.json @@ -75,6 +75,7 @@ "query_extension_result": "問題優化結果", "question_tip": "由上至下,各個模組的回應順序", "read_raw_source": "開啟原文", + "images_collection_not_supported": "圖片資料集不支持開啟原文", "reasoning_text": "思考過程", "release_cancel": "鬆開取消", "release_send": "鬆開傳送,上滑取消", diff --git a/projects/app/src/components/Markdown/img/Image.tsx b/projects/app/src/components/Markdown/img/Image.tsx index e4939cf732fc..30c41e4a5443 100644 --- a/projects/app/src/components/Markdown/img/Image.tsx +++ b/projects/app/src/components/Markdown/img/Image.tsx @@ -1,14 +1,56 @@ -import React, { useState } from 'react'; +import React, { useState, useEffect } from 'react'; import { Box, type ImageProps, Skeleton } from '@chakra-ui/react'; import MyPhotoView from '@fastgpt/web/components/common/Image/PhotoView'; import { useBoolean } from 'ahooks'; import { useTranslation } from 'next-i18next'; +import { getPresignedDatasetFileGetUrl } from '@/web/core/dataset/api'; +import { getPresignedChatFileGetUrl } from '@/web/common/file/api'; +import type { AProps } from '../A'; -const MdImage = ({ src, ...props }: { src?: string } & ImageProps) => { +const MdImage = ({ + src, + ...props +}: { src?: string } & ImageProps & { chatAuthData?: AProps['chatAuthData'] }) => { const { t } = useTranslation(); const [isLoaded, { setTrue }] = useBoolean(false); - const [renderSrc, setRenderSrc] = useState(src); + const [isLoading, setIsLoading] = useState(false); + + useEffect(() => { + if (!src || (!src.startsWith('dataset/') && !src.startsWith('chat/'))) { + setRenderSrc(src); + return; + } + + const loadS3Image = async () => { + try { + setIsLoading(true); + if (src.startsWith('dataset/')) { + const url = await getPresignedDatasetFileGetUrl({ key: src }); + setRenderSrc(url); + } else if (src.startsWith('chat/')) { + const url = await getPresignedChatFileGetUrl({ + key: src, + appId: props.chatAuthData?.appId || '', + outLinkAuthData: { + shareId: props.chatAuthData?.shareId, + outLinkUid: props.chatAuthData?.outLinkUid, + teamId: props.chatAuthData?.teamId, + teamToken: props.chatAuthData?.teamToken + } + }); + setRenderSrc(url); + } + } catch (error) { + console.error('Failed to sign S3 image:', error); + setRenderSrc('/imgs/errImg.png'); + } finally { + setIsLoading(false); + } + }; + + loadS3Image(); + }, [src, props.chatAuthData]); if (src?.includes('base64') && !src.startsWith('data:image')) { return Invalid base64 image; @@ -19,7 +61,7 @@ const MdImage = ({ src, ...props }: { src?: string } & ImageProps) => { } return ( - + { const components = useCreation(() => { return { - img: Image, + img: (props: any) => {props.alt}, pre: RewritePre, code: Code, a: (props: any) => ( @@ -145,8 +145,8 @@ function Code(e: any) { return Component; } -function Image({ src }: { src?: string }) { - return ; +function Image({ src, chatAuthData }: { src?: string; chatAuthData?: AProps['chatAuthData'] }) { + return ; } function RewritePre({ children }: any) { diff --git a/projects/app/src/pageComponents/dataset/detail/Import/diffSource/FileLocal.tsx b/projects/app/src/pageComponents/dataset/detail/Import/diffSource/FileLocal.tsx index 18c60f0c0e8e..3358845a351e 100644 --- a/projects/app/src/pageComponents/dataset/detail/Import/diffSource/FileLocal.tsx +++ b/projects/app/src/pageComponents/dataset/detail/Import/diffSource/FileLocal.tsx @@ -15,6 +15,9 @@ import { getErrText } from '@fastgpt/global/common/error/utils'; import { formatFileSize } from '@fastgpt/global/common/file/tools'; import { getFileIcon } from '@fastgpt/global/common/file/icon'; import { DatasetPageContext } from '@/web/core/dataset/context/datasetPageContext'; +import { getUploadDatasetFilePresignedUrl } from '@/web/common/file/api'; +import { POST } from '@/web/common/api/request'; +import { parseS3UploadError } from '@fastgpt/global/common/error/s3'; const DataProcess = dynamic(() => import('../commonProgress/DataProcess')); const PreviewData = dynamic(() => import('../commonProgress/PreviewData')); @@ -67,39 +70,51 @@ const SelectFile = React.memo(function SelectFile() { await Promise.all( files.map(async ({ fileId, file }) => { try { - const { fileId: uploadFileId } = await uploadFile2DB({ - file, - bucketName: BucketNameEnum.dataset, - data: { - datasetId + const { url, fields, maxSize } = await getUploadDatasetFilePresignedUrl({ + filename: file.name, + datasetId + }); + + // Upload File to S3 + const formData = new FormData(); + Object.entries(fields).forEach(([k, v]) => formData.set(k, v)); + formData.set('file', file); + await POST(url, formData, { + headers: { + 'Content-Type': 'multipart/form-data; charset=utf-8' }, - percentListen: (e) => { + onUploadProgress: (e) => { + if (!e.total) return; + const percent = Math.round((e.loaded / e.total) * 100); setSelectFiles((state) => state.map((item) => item.id === fileId ? { ...item, uploadedFileRate: item.uploadedFileRate - ? Math.max(e, item.uploadedFileRate) - : e + ? Math.max(percent, item.uploadedFileRate) + : percent } : item ) ); } - }); - setSelectFiles((state) => - state.map((item) => - item.id === fileId - ? { - ...item, - dbFileId: uploadFileId, - isUploading: false, - uploadedFileRate: 100 - } - : item - ) - ); + }) + .then(() => { + setSelectFiles((state) => + state.map((item) => + item.id === fileId + ? { + ...item, + dbFileId: fields.key, + isUploading: false, + uploadedFileRate: 100 + } + : item + ) + ); + }) + .catch((error) => Promise.reject(parseS3UploadError({ t, error, maxSize }))); } catch (error) { setSelectFiles((state) => state.map((item) => diff --git a/projects/app/src/pageComponents/dataset/detail/MetaDataCard.tsx b/projects/app/src/pageComponents/dataset/detail/MetaDataCard.tsx index 9aa951eb2c7b..c94afa8410e0 100644 --- a/projects/app/src/pageComponents/dataset/detail/MetaDataCard.tsx +++ b/projects/app/src/pageComponents/dataset/detail/MetaDataCard.tsx @@ -56,13 +56,15 @@ const MetaDataCard = ({ datasetId }: { datasetId: string }) => { }, { label: t('dataset:collection_name'), - value: collection.file?.filename || collection?.rawLink || collection?.name + value: decodeURIComponent( + collection.file?.filename || collection?.rawLink || collection?.name + ) }, ...(collection.file ? [ { label: t('common:core.dataset.collection.metadata.source size'), - value: formatFileSize(collection.file.length) + value: formatFileSize(collection.file.contentLength || 0) } ] : []), diff --git a/projects/app/src/pages/api/core/app/copy.ts b/projects/app/src/pages/api/core/app/copy.ts index 01cc744e28cc..978925b49229 100644 --- a/projects/app/src/pages/api/core/app/copy.ts +++ b/projects/app/src/pages/api/core/app/copy.ts @@ -40,7 +40,7 @@ async function handler( const avatar = await copyAvatarImage({ teamId, imageUrl: app.avatar, - ttl: true, + temporary: true, session }); diff --git a/projects/app/src/pages/api/core/app/create.ts b/projects/app/src/pages/api/core/app/create.ts index 66aa5bd9b106..63660b6ce341 100644 --- a/projects/app/src/pages/api/core/app/create.ts +++ b/projects/app/src/pages/api/core/app/create.ts @@ -29,6 +29,9 @@ import { MongoResourcePermission } from '@fastgpt/service/support/permission/sch import { getMyModels } from '@fastgpt/service/support/permission/model/controller'; import { removeUnauthModels } from '@fastgpt/global/core/workflow/utils'; import { getS3AvatarSource } from '@fastgpt/service/common/s3/sources/avatar'; +import { MongoAppTemplate } from '@fastgpt/service/core/app/templates/templateSchema'; +import { getNanoid } from '@fastgpt/global/common/string/tools'; +import path from 'node:path'; export type CreateAppBody = { parentId?: ParentIdType; @@ -157,11 +160,34 @@ export const onCreateApp = async ({ } const create = async (session: ClientSession) => { + const _avatar = await (async () => { + if (!templateId) return avatar; + + const template = await MongoAppTemplate.findOne({ templateId }, 'avatar').lean(); + if (!template?.avatar) return avatar; + + const s3AvatarSource = getS3AvatarSource(); + if (!s3AvatarSource.isAvatarKey(template.avatar)) return template.avatar; + + const filename = (() => { + const last = template.avatar.split('/').pop()?.split('-')[1]; + if (!last) return getNanoid(6).concat(path.extname(template.avatar)); + return `${getNanoid(6)}-${last}`; + })(); + + return await s3AvatarSource.copyAvatar({ + key: template.avatar, + teamId, + filename, + temporary: true + }); + })(); + const [app] = await MongoApp.create( [ { ...parseParentIdInMongo(parentId), - avatar, + avatar: _avatar, name, intro, teamId, @@ -207,7 +233,7 @@ export const onCreateApp = async ({ resourceType: PerResourceTypeEnum.app }); - await getS3AvatarSource().refreshAvatar(avatar, undefined, session); + await getS3AvatarSource().refreshAvatar(_avatar, undefined, session); (async () => { addAuditLog({ diff --git a/projects/app/src/pages/api/core/dataset/collection/create/backup.ts b/projects/app/src/pages/api/core/dataset/collection/create/backup.ts index 3e67104f6d13..eec9cd21e227 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/backup.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/backup.ts @@ -13,6 +13,7 @@ import { } from '@fastgpt/global/core/dataset/constants'; import { i18nT } from '@fastgpt/web/i18n/utils'; import { uploadFile } from '@fastgpt/service/common/file/gridfs/controller'; +import { ParsedFileContentS3Key } from '@fastgpt/service/common/s3/utils'; export type backupQuery = {}; @@ -48,7 +49,12 @@ async function handler(req: ApiRequestProps, res: ApiRe tmbId, path: file.path, encoding: file.encoding, - getFormatText: false + getFormatText: false, + uploadKey: ParsedFileContentS3Key.dataset({ + datasetId: dataset._id, + mimetype: file.mimetype, + filename: file.originalname + }).key }); if (!rawText.trim().startsWith('q,a,indexes')) { return Promise.reject(i18nT('dataset:backup_template_invalid')); diff --git a/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts b/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts index ce9d20924789..707e9a8602a3 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts @@ -8,8 +8,9 @@ import { NextAPI } from '@/service/middleware/entry'; import { type ApiRequestProps } from '@fastgpt/service/type/next'; import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; import { type CreateCollectionResponse } from '@/global/core/dataset/api'; -import { deleteRawTextBuffer } from '@fastgpt/service/common/buffer/rawText/controller'; import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; +import { S3Sources } from '@fastgpt/service/common/s3/type'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; async function handler( req: ApiRequestProps @@ -24,17 +25,23 @@ async function handler( datasetId: body.datasetId }); - // 1. read file - const file = await getFileById({ - bucketName: BucketNameEnum.dataset, - fileId - }); + const filename = await (async () => { + if (fileId.startsWith(S3Sources.dataset)) { + const metadata = await getS3DatasetSource().getFileMetadata(fileId); + if (!metadata) return Promise.reject(CommonErrEnum.fileNotFound); + return metadata.filename; + } - if (!file) { - return Promise.reject(CommonErrEnum.fileNotFound); - } + const file = await getFileById({ + bucketName: BucketNameEnum.dataset, + fileId + }); + if (!file) { + return Promise.reject(CommonErrEnum.fileNotFound); + } - const filename = file.filename; + return file.filename; + })(); const { collectionId, insertResults } = await createCollectionAndInsertData({ dataset, @@ -44,17 +51,11 @@ async function handler( tmbId, type: DatasetCollectionTypeEnum.file, name: filename, - fileId, - metadata: { - relatedImgId: fileId - }, + fileId, // ObjectId -> ObjectKey customPdfParse } }); - // remove buffer - await deleteRawTextBuffer(fileId); - return { collectionId, results: insertResults diff --git a/projects/app/src/pages/api/core/dataset/collection/create/images.ts b/projects/app/src/pages/api/core/dataset/collection/create/images.ts index 32fd193dfa87..cb4151ea5e51 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/images.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/images.ts @@ -14,8 +14,14 @@ import { removeFilesByPaths } from '@fastgpt/service/common/file/utils'; import type { NextApiResponse } from 'next'; import { i18nT } from '@fastgpt/web/i18n/utils'; import { authFrequencyLimit } from '@/service/common/frequencyLimit/api'; -import { addSeconds } from 'date-fns'; +import { addDays, addSeconds } from 'date-fns'; import { createDatasetImage } from '@fastgpt/service/core/dataset/image/controller'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { S3Sources } from '@fastgpt/service/common/s3/type'; +import { getNanoid } from '@fastgpt/global/common/string/tools'; +import fsp from 'node:fs/promises'; +import path from 'node:path'; +import { uploadImage2S3Bucket } from '@fastgpt/service/common/s3/utils'; const authUploadLimit = (tmbId: string, num: number) => { if (!global.feConfigs.uploadFileMaxAmount) return; @@ -56,16 +62,18 @@ async function handler( return Promise.reject(i18nT('file:Image_dataset_requires_VLM_model_to_be_configured')); } - // 1. Save image to db + // 1. Save image to S3 const imageIds = await Promise.all( files.map(async (file) => { - return ( - await createDatasetImage({ - teamId, - datasetId, - file - }) - ).imageId; + const filename = path.basename(file.filename); + const uploadKey = [S3Sources.dataset, datasetId, `${getNanoid(6)}-${filename}`].join('/'); + return uploadImage2S3Bucket('private', { + base64Img: (await fsp.readFile(file.path)).toString('base64'), + uploadKey, + mimetype: file.mimetype, + filename, + expiredTime: addDays(new Date(), 7) + }); }) ); diff --git a/projects/app/src/pages/api/core/dataset/collection/create/template.ts b/projects/app/src/pages/api/core/dataset/collection/create/template.ts index 52dedb5eb940..480a80b6a479 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/template.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/template.ts @@ -13,6 +13,7 @@ import { } from '@fastgpt/global/core/dataset/constants'; import { i18nT } from '@fastgpt/web/i18n/utils'; import { uploadFile } from '@fastgpt/service/common/file/gridfs/controller'; +import { ParsedFileContentS3Key } from '@fastgpt/service/common/s3/utils'; export type templateImportQuery = {}; @@ -51,7 +52,12 @@ async function handler( tmbId, path: file.path, encoding: file.encoding, - getFormatText: false + getFormatText: false, + uploadKey: ParsedFileContentS3Key.dataset({ + datasetId: dataset._id, + mimetype: file.mimetype, + filename: file.originalname + }).key }); if (!rawText.trim().startsWith('q,a,indexes')) { return Promise.reject(i18nT('dataset:template_file_invalid')); diff --git a/projects/app/src/pages/api/core/dataset/collection/create/text.ts b/projects/app/src/pages/api/core/dataset/collection/create/text.ts index 66cb32513937..7d778eba0392 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/text.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/text.ts @@ -6,7 +6,8 @@ import { DatasetCollectionTypeEnum } from '@fastgpt/global/core/dataset/constant import { NextAPI } from '@/service/middleware/entry'; import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; import { type CreateCollectionResponse } from '@/global/core/dataset/api'; -import { createFileFromText } from '@fastgpt/service/common/file/gridfs/utils'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { removeS3TTL } from '@fastgpt/service/common/s3/utils'; async function handler(req: NextApiRequest): CreateCollectionResponse { const { name, text, ...body } = req.body as TextCreateDatasetCollectionParams; @@ -21,14 +22,11 @@ async function handler(req: NextApiRequest): CreateCollectionResponse { // 1. Create file from text const filename = `${name}.txt`; - const { fileId } = await createFileFromText({ - bucket: 'dataset', - filename, - text, - metadata: { - teamId, - uid: tmbId - } + const s3DatasetSource = getS3DatasetSource(); + const key = await s3DatasetSource.uploadDatasetFileByBuffer({ + datasetId: String(dataset._id), + buffer: Buffer.from(text), + filename }); const { collectionId, insertResults } = await createCollectionAndInsertData({ @@ -38,11 +36,13 @@ async function handler(req: NextApiRequest): CreateCollectionResponse { teamId, tmbId, type: DatasetCollectionTypeEnum.file, - fileId, + fileId: key, name: filename } }); + await removeS3TTL({ key, bucketName: 'private' }); + return { collectionId, results: insertResults diff --git a/projects/app/src/pages/api/core/dataset/collection/delete.ts b/projects/app/src/pages/api/core/dataset/collection/delete.ts index 36488944f40a..3840659940e8 100644 --- a/projects/app/src/pages/api/core/dataset/collection/delete.ts +++ b/projects/app/src/pages/api/core/dataset/collection/delete.ts @@ -43,7 +43,7 @@ async function handler(req: ApiRequestProps) teamId, datasetId: collection.datasetId, collectionId, - fields: '_id teamId datasetId fileId metadata' + fields: '_id teamId type datasetId fileId metadata' }); }) ).then((res) => { diff --git a/projects/app/src/pages/api/core/dataset/collection/detail.ts b/projects/app/src/pages/api/core/dataset/collection/detail.ts index 4982b796d246..ba3492b460d5 100644 --- a/projects/app/src/pages/api/core/dataset/collection/detail.ts +++ b/projects/app/src/pages/api/core/dataset/collection/detail.ts @@ -1,4 +1,4 @@ -/* +/* Get one dataset collection detail */ import type { NextApiRequest } from 'next'; @@ -14,6 +14,8 @@ import { collectionTagsToTagLabel } from '@fastgpt/service/core/dataset/collecti import { getVectorCountByCollectionId } from '@fastgpt/service/common/vectorDB/controller'; import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; import { readFromSecondary } from '@fastgpt/service/common/mongo/utils'; +import { S3Sources } from '@fastgpt/service/common/s3/type'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; async function handler(req: NextApiRequest): Promise { const { id } = req.query as { id: string }; @@ -31,10 +33,15 @@ async function handler(req: NextApiRequest): Promise per: ReadPermissionVal }); - // get file + const fileId = collection?.fileId; const [file, indexAmount, errorCount] = await Promise.all([ - collection?.fileId - ? await getFileById({ bucketName: BucketNameEnum.dataset, fileId: collection.fileId }) + fileId + ? fileId.startsWith(S3Sources.dataset) + ? getS3DatasetSource().getFileMetadata(fileId) + : (async () => { + const file = await getFileById({ bucketName: BucketNameEnum.dataset, fileId }); + return { filename: file?.filename, contentLength: file?.length }; + })() : undefined, getVectorCountByCollectionId(collection.teamId, collection.datasetId, collection._id), MongoDatasetTraining.countDocuments( diff --git a/projects/app/src/pages/api/core/dataset/data/delete.ts b/projects/app/src/pages/api/core/dataset/data/delete.ts index cd9b5ee47617..bf664a74a421 100644 --- a/projects/app/src/pages/api/core/dataset/data/delete.ts +++ b/projects/app/src/pages/api/core/dataset/data/delete.ts @@ -7,6 +7,7 @@ import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; import { addAuditLog } from '@fastgpt/service/support/user/audit/util'; import { AuditEventEnum } from '@fastgpt/global/support/user/audit/constants'; import { getI18nDatasetType } from '@fastgpt/service/support/user/audit/util'; + async function handler(req: NextApiRequest) { const { id: dataId } = req.query as { id: string; @@ -26,6 +27,7 @@ async function handler(req: NextApiRequest) { }); await deleteDatasetData(datasetData); + (async () => { addAuditLog({ tmbId, diff --git a/projects/app/src/pages/api/core/dataset/data/insertData.ts b/projects/app/src/pages/api/core/dataset/data/insertData.ts index 5a2d7015780f..945d822ea44d 100644 --- a/projects/app/src/pages/api/core/dataset/data/insertData.ts +++ b/projects/app/src/pages/api/core/dataset/data/insertData.ts @@ -1,4 +1,4 @@ -/* +/* insert one data to dataset (immediately insert) manual input or mark data */ @@ -60,9 +60,7 @@ async function handler(req: NextApiRequest) { text: simpleText(item.text) })); - const token = await countPromptTokens(formatQ + formatA, ''); const vectorModelData = getEmbeddingModel(vectorModel); - const llmModelData = getLLMModel(agentModel); await hasSameValue({ teamId, diff --git a/projects/app/src/pages/api/core/dataset/data/insertImages.ts b/projects/app/src/pages/api/core/dataset/data/insertImages.ts index 164863d57054..2297c69388d6 100644 --- a/projects/app/src/pages/api/core/dataset/data/insertImages.ts +++ b/projects/app/src/pages/api/core/dataset/data/insertImages.ts @@ -1,19 +1,21 @@ import type { ApiRequestProps, ApiResponseType } from '@fastgpt/service/type/next'; import { NextAPI } from '@/service/middleware/entry'; import { authFrequencyLimit } from '@/service/common/frequencyLimit/api'; -import { addSeconds } from 'date-fns'; +import { addDays, addSeconds } from 'date-fns'; import { removeFilesByPaths } from '@fastgpt/service/common/file/utils'; import { getUploadModel } from '@fastgpt/service/common/file/multer'; import { authDatasetCollection } from '@fastgpt/service/support/permission/dataset/auth'; import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; -import { createDatasetImage } from '@fastgpt/service/core/dataset/image/controller'; import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; import { createTrainingUsage } from '@fastgpt/service/support/wallet/usage/controller'; import { UsageSourceEnum } from '@fastgpt/global/support/wallet/usage/constants'; import { getEmbeddingModel, getLLMModel, getVlmModel } from '@fastgpt/service/core/ai/model'; import { pushDataListToTrainingQueue } from '@fastgpt/service/core/dataset/training/controller'; import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constants'; -import { removeDatasetImageExpiredTime } from '@fastgpt/service/core/dataset/image/utils'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import path from 'node:path'; +import fsp from 'node:fs/promises'; +import { ParsedFileContentS3Key, uploadImage2S3Bucket } from '@fastgpt/service/common/s3/utils'; export type insertImagesQuery = {}; @@ -60,17 +62,21 @@ async function handler( await authUploadLimit(tmbId, files.length); - // 1. Upload images to db + // 1. Upload images to S3 const imageIds = await Promise.all( - files.map(async (file) => { - return ( - await createDatasetImage({ - teamId, + files.map(async (file) => + uploadImage2S3Bucket('private', { + base64Img: (await fsp.readFile(file.path)).toString('base64'), + uploadKey: ParsedFileContentS3Key.dataset({ datasetId: dataset._id, - file - }) - ).imageId; - }) + mimetype: file.mimetype, + filename: path.basename(file.filename) + }).key, + mimetype: file.mimetype, + filename: path.basename(file.filename), + expiredTime: addDays(new Date(), 7) + }) + ) ); // 2. Insert images to training queue @@ -104,13 +110,6 @@ async function handler( })), session }); - - // 3. Clear ttl - await removeDatasetImageExpiredTime({ - ids: imageIds, - collectionId, - session - }); }); return {}; diff --git a/projects/app/src/pages/api/core/dataset/data/v2/list.ts b/projects/app/src/pages/api/core/dataset/data/v2/list.ts index bc2578b44cf8..dc7262b1d723 100644 --- a/projects/app/src/pages/api/core/dataset/data/v2/list.ts +++ b/projects/app/src/pages/api/core/dataset/data/v2/list.ts @@ -10,6 +10,7 @@ import { parsePaginationRequest } from '@fastgpt/service/common/api/pagination'; import { MongoDatasetImageSchema } from '@fastgpt/service/core/dataset/image/schema'; import { readFromSecondary } from '@fastgpt/service/common/mongo/utils'; import { getDatasetImagePreviewUrl } from '@fastgpt/service/core/dataset/image/utils'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; export type GetDatasetDataListProps = PaginationProps & { searchText?: string; @@ -56,10 +57,11 @@ async function handler( const imageIds = list.map((item) => item.imageId!).filter(Boolean); const imageSizeMap = new Map(); + const s3DatasetSource = getS3DatasetSource(); if (imageIds.length > 0) { const imageInfos = await MongoDatasetImageSchema.find( - { _id: { $in: imageIds } }, + { _id: { $in: imageIds.filter((id) => !s3DatasetSource.isDatasetObjectKey(id)) } }, '_id length', { ...readFromSecondary @@ -69,26 +71,38 @@ async function handler( imageInfos.forEach((item) => { imageSizeMap.set(String(item._id), item.length); }); + + const s3ImageIds = imageIds.filter((id) => s3DatasetSource.isDatasetObjectKey(id)); + for (const id of s3ImageIds) { + imageSizeMap.set(id, (await s3DatasetSource.getFileMetadata(id)).contentLength); + } } return { - list: list.map((item) => { - const imageSize = item.imageId ? imageSizeMap.get(String(item.imageId)) : undefined; - const imagePreviewUrl = item.imageId - ? getDatasetImagePreviewUrl({ - imageId: item.imageId, - teamId, - datasetId: collection.datasetId, - expiredMinutes: 30 - }) - : undefined; + list: await Promise.all( + list.map(async (item) => { + const imageSize = item.imageId ? imageSizeMap.get(String(item.imageId)) : undefined; + const imagePreviewUrl = item.imageId + ? s3DatasetSource.isDatasetObjectKey(item.imageId) + ? await getS3DatasetSource().createGetDatasetFileURL({ + key: item.imageId, + expiredHours: 24 + }) + : getDatasetImagePreviewUrl({ + imageId: item.imageId, + teamId, + datasetId: collection.datasetId, + expiredMinutes: 30 + }) + : undefined; - return { - ...item, - imageSize, - imagePreviewUrl - }; - }), + return { + ...item, + imageSize, + imagePreviewUrl + }; + }) + ), total }; } diff --git a/projects/app/src/pages/api/core/dataset/file/getPreviewChunks.ts b/projects/app/src/pages/api/core/dataset/file/getPreviewChunks.ts index b6a8c1dfbc31..c50d9cc53190 100644 --- a/projects/app/src/pages/api/core/dataset/file/getPreviewChunks.ts +++ b/projects/app/src/pages/api/core/dataset/file/getPreviewChunks.ts @@ -95,7 +95,8 @@ async function handler( selector, externalFileId, customPdfParse, - apiDatasetServer: dataset.apiDatasetServer + apiDatasetServer: dataset.apiDatasetServer, + datasetId }); const chunks = await rawText2Chunks({ diff --git a/projects/app/src/pages/api/core/dataset/presignDatasetFileGetUrl.ts b/projects/app/src/pages/api/core/dataset/presignDatasetFileGetUrl.ts new file mode 100644 index 000000000000..6b3e7ea0e1c0 --- /dev/null +++ b/projects/app/src/pages/api/core/dataset/presignDatasetFileGetUrl.ts @@ -0,0 +1,78 @@ +import { NextAPI } from '@/service/middleware/entry'; +import { type ApiRequestProps } from '@fastgpt/service/type/next'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { + PresignDatasetFileGetUrlSchema, + type PresignDatasetFileGetUrlParams +} from '@fastgpt/global/core/dataset/v2/api'; +import { + authDataset, + authDatasetCollection +} from '@fastgpt/service/support/permission/dataset/auth'; +import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; +import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant'; +import { createFileToken } from '@fastgpt/service/support/permission/auth/file'; +import { BucketNameEnum, ReadFileBaseUrl } from '@fastgpt/global/common/file/constants'; +import { DatasetCollectionTypeEnum } from '@fastgpt/global/core/dataset/constants'; +import { UserError } from '@fastgpt/global/common/error/utils'; + +async function handler(req: ApiRequestProps) { + const parsed = PresignDatasetFileGetUrlSchema.parse(req.body); + const s3DatasetSource = getS3DatasetSource(); + + // 获取文档中解析出来的图片 + if ('key' in parsed) { + const { key } = parsed; + + const datasetId = key.split('/')[1] || ''; + await authDataset({ + datasetId, + per: ReadPermissionVal, + req, + authToken: true, + authApiKey: true + }); + + return await s3DatasetSource.createGetDatasetFileURL({ key, expiredHours: 24 }); + } + + // 其他文件 + const { collectionId } = parsed; + const { + collection, + teamId: userTeamId, + tmbId: uid, + authType + } = await authDatasetCollection({ + req, + collectionId, + authToken: true, + authApiKey: true, + per: ReadPermissionVal + }); + + if (collection.type === DatasetCollectionTypeEnum.images) { + return Promise.reject(new UserError('chat:images_collection_not_supported')); + } + + const key = collection.fileId; + if (!key) { + return Promise.reject(CommonErrEnum.unAuthFile); + } + + if (s3DatasetSource.isDatasetObjectKey(key)) { + return await s3DatasetSource.createGetDatasetFileURL({ key, expiredHours: 24 }); + } else { + const token = await createFileToken({ + uid, + fileId: key, + teamId: userTeamId, + bucketName: BucketNameEnum.dataset, + customExpireMinutes: authType === 'outLink' ? 5 : undefined + }); + + return `${ReadFileBaseUrl}/${collection.name}?token=${token}`; + } +} + +export default NextAPI(handler); diff --git a/projects/app/src/pages/api/core/dataset/presignDatasetFilePostUrl.ts b/projects/app/src/pages/api/core/dataset/presignDatasetFilePostUrl.ts new file mode 100644 index 000000000000..e53ecfe4feae --- /dev/null +++ b/projects/app/src/pages/api/core/dataset/presignDatasetFilePostUrl.ts @@ -0,0 +1,41 @@ +import type { ApiRequestProps } from '@fastgpt/service/type/next'; +import { NextAPI } from '@/service/middleware/entry'; +import { type CreatePostPresignedUrlResult } from '@fastgpt/service/common/s3/type'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { authFrequencyLimit } from '@/service/common/frequencyLimit/api'; +import { addSeconds } from 'date-fns'; +import type { PresignDatasetFilePostUrlParams } from '@fastgpt/global/core/dataset/v2/api'; +import { authDataset } from '@fastgpt/service/support/permission/dataset/auth'; +import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; + +const authUploadLimit = (tmbId: string) => { + if (!global.feConfigs.uploadFileMaxAmount) return; + return authFrequencyLimit({ + eventId: `${tmbId}-uploadfile`, + maxAmount: global.feConfigs.uploadFileMaxAmount * 2, + expiredTime: addSeconds(new Date(), 30) // 30s + }); +}; + +async function handler( + req: ApiRequestProps +): Promise { + const { filename, datasetId } = req.body; + + const { userId } = await authDataset({ + datasetId, + per: WritePermissionVal, + req, + authToken: true, + authApiKey: true + }); + + await authUploadLimit(userId); + + return await getS3DatasetSource().createUploadDatasetFileURL({ + datasetId, + filename + }); +} + +export default NextAPI(handler); diff --git a/projects/app/src/pages/api/system/file/[jwt].ts b/projects/app/src/pages/api/system/file/[jwt].ts new file mode 100644 index 000000000000..b4104758638b --- /dev/null +++ b/projects/app/src/pages/api/system/file/[jwt].ts @@ -0,0 +1,70 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { jsonRes } from '@fastgpt/service/common/response'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { addLog } from '@fastgpt/service/common/system/log'; +import { jwtVerifyS3ObjectKey } from '@fastgpt/service/common/s3/utils'; +import { getS3ChatSource } from '@fastgpt/service/common/s3/sources/chat'; + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + try { + const { jwt } = req.query as { jwt: string }; + + const s3DatasetSource = getS3DatasetSource(); + const s3ChatSource = getS3ChatSource(); + + const { objectKey } = await jwtVerifyS3ObjectKey(jwt); + + if (s3DatasetSource.isDatasetObjectKey(objectKey) || s3ChatSource.isChatFileKey(objectKey)) { + try { + const [stream, metadata] = await Promise.all( + (() => { + if (s3DatasetSource.isDatasetObjectKey(objectKey)) { + return [ + s3DatasetSource.getDatasetFileStream(objectKey), + s3DatasetSource.getFileMetadata(objectKey) + ]; + } else { + return [ + s3ChatSource.getChatFileStream(objectKey), + s3ChatSource.getFileMetadata(objectKey) + ]; + } + })() + ); + + res.setHeader('Content-Type', metadata.contentType); + res.setHeader('Cache-Control', 'public, max-age=31536000'); + res.setHeader('Content-Length', metadata.contentLength); + + stream.pipe(res); + + stream.on('error', (error) => { + addLog.error('Error reading dataset file', { error }); + if (!res.headersSent) { + res.status(500).end(); + } + }); + + stream.on('end', () => { + res.end(); + }); + return; + } catch (error) { + return jsonRes(res, { + code: 500, + error + }); + } + } + + jsonRes(res, { + code: 404, + error: 'File not found' + }); + } catch (error) { + jsonRes(res, { + code: 500, + error + }); + } +} diff --git a/projects/app/src/service/core/dataset/data/controller.ts b/projects/app/src/service/core/dataset/data/controller.ts index 07b11f7905c0..82077b635d35 100644 --- a/projects/app/src/service/core/dataset/data/controller.ts +++ b/projects/app/src/service/core/dataset/data/controller.ts @@ -19,6 +19,8 @@ import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/cons import { countPromptTokens } from '@fastgpt/service/common/string/tiktoken'; import { deleteDatasetImage } from '@fastgpt/service/core/dataset/image/controller'; import { text2Chunks } from '@fastgpt/service/worker/function'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; +import { removeS3TTL } from '@fastgpt/service/common/s3/utils'; const formatIndexes = async ({ indexes = [], @@ -215,7 +217,6 @@ export async function insertData2Dataset({ dataId: insertIds[index] })); - // 2. Create mongo data const [{ _id }] = await MongoDatasetData.create( [ { @@ -248,6 +249,11 @@ export async function insertData2Dataset({ { session, ordered: true } ); + // 只移除图片数据集的图片的 TTL + if (getS3DatasetSource().isDatasetObjectKey(imageId)) { + await removeS3TTL({ key: imageId, bucketName: 'private', session }); + } + return { insertId: _id, tokens @@ -424,6 +430,8 @@ export const deleteDatasetData = async (data: DatasetDataItemType) => { await deleteDatasetImage(data.imageId); } + // Note: We don't delete parsed images from S3 here - they will be cleaned up when the collection is deleted + // 3. Delete vector data await deleteDatasetDataVector({ teamId: data.teamId, diff --git a/projects/app/src/service/core/dataset/queues/datasetParse.ts b/projects/app/src/service/core/dataset/queues/datasetParse.ts index 5017377d9745..e6a1878a8d8a 100644 --- a/projects/app/src/service/core/dataset/queues/datasetParse.ts +++ b/projects/app/src/service/core/dataset/queues/datasetParse.ts @@ -32,6 +32,7 @@ import { POST } from '@fastgpt/service/common/api/plusRequest'; import { pushLLMTrainingUsage } from '@fastgpt/service/support/wallet/usage/controller'; import { MongoImage } from '@fastgpt/service/common/file/image/schema'; import { UsageItemTypeEnum } from '@fastgpt/global/support/wallet/usage/constants'; +import { getS3DatasetSource } from '@fastgpt/service/common/s3/sources/dataset'; const requestLLMPargraph = async ({ rawText, @@ -176,7 +177,12 @@ export const datasetParseQueue = async (): Promise => { continue; } - addLog.info(`[Parse Queue] Start`); + addLog.info(`[Parse Queue] ========== START PROCESSING ==========`, { + collectionId: collection._id, + datasetId: dataset._id, + fileId: collection.fileId, + type: collection.type + }); try { const trainingMode = getTrainingModeByCollection({ @@ -230,12 +236,12 @@ export const datasetParseQueue = async (): Promise => { continue; } - // 2. Read source - const { title, rawText } = await readDatasetSourceRawText({ + let { title, rawText } = await readDatasetSourceRawText({ teamId: data.teamId, tmbId: data.tmbId, customPdfParse: collection.customPdfParse, usageId: data.billId, + datasetId: data.datasetId, ...sourceReadType }); @@ -303,6 +309,15 @@ export const datasetParseQueue = async (): Promise => { ); // 6. Push to chunk queue + const trainingData = chunks.map((item, index) => ({ + ...item, + indexes: item.indexes?.map((text) => ({ + type: DatasetDataIndexTypeEnum.custom, + text + })), + chunkIndex: index + })); + await pushDataListToTrainingQueue({ teamId: data.teamId, tmbId: data.tmbId, @@ -314,14 +329,7 @@ export const datasetParseQueue = async (): Promise => { indexSize: collection.indexSize, mode: trainingMode, billId: data.billId, - data: chunks.map((item, index) => ({ - ...item, - indexes: item.indexes?.map((text) => ({ - type: DatasetDataIndexTypeEnum.custom, - text - })), - chunkIndex: index - })), + data: trainingData, session }); @@ -335,24 +343,31 @@ export const datasetParseQueue = async (): Promise => { } ); - // 8. Remove image ttl - const relatedImgId = collection.metadata?.relatedImgId; - if (relatedImgId) { - await MongoImage.updateMany( - { - teamId: collection.teamId, - 'metadata.relatedId': relatedImgId - }, - { - // Remove expiredTime to avoid ttl expiration - $unset: { - expiredTime: 1 + // 8. Remove file TTL (images TTL will be removed after successful insertion to dataset_datas) + // 8.1 For S3 files, remove file TTL only + if (collection.fileId && getS3DatasetSource().isDatasetObjectKey(collection.fileId)) { + // await removeS3TTL({ key: collection.fileId, bucketName: 'private', session }); + } + // 8.2 For GridFS files (legacy), remove MongoDB image TTL + else { + const relatedImgId = collection.metadata?.relatedImgId; + if (relatedImgId) { + await MongoImage.updateMany( + { + teamId: collection.teamId, + 'metadata.relatedId': relatedImgId + }, + { + // Remove expiredTime to avoid ttl expiration + $unset: { + expiredTime: 1 + } + }, + { + session } - }, - { - session - } - ); + ); + } } }); diff --git a/projects/app/src/service/core/dataset/queues/generateVector.ts b/projects/app/src/service/core/dataset/queues/generateVector.ts index 60fbfc4552bf..62f475f62b7b 100644 --- a/projects/app/src/service/core/dataset/queues/generateVector.ts +++ b/projects/app/src/service/core/dataset/queues/generateVector.ts @@ -78,6 +78,9 @@ export async function generateVector(): Promise { select: '_id indexes' } ]) + .select( + 'teamId tmbId datasetId collectionId q a imageId imageDescMap chunkIndex indexSize billId mode retryCount lockTime indexes' + ) .lean(); // task preemption @@ -278,6 +281,7 @@ const insertData = async ({ trainingData }: { trainingData: TrainingDataType }) embeddingModel: trainingData.dataset.vectorModel, session }); + // delete data from training await MongoDatasetTraining.deleteOne({ _id: trainingData._id }, { session }); diff --git a/projects/app/src/web/common/file/api.ts b/projects/app/src/web/common/file/api.ts index 7322463bb69d..43f52c3e0ed9 100644 --- a/projects/app/src/web/common/file/api.ts +++ b/projects/app/src/web/common/file/api.ts @@ -54,3 +54,10 @@ export const getPresignedChatFileGetUrl = (params: { }) => { return POST('/core/chat/presignChatFileGetUrl', params); }; + +export const getUploadDatasetFilePresignedUrl = (params: { + filename: string; + datasetId: string; +}) => { + return POST('/core/dataset/presignDatasetFilePostUrl', params); +}; diff --git a/projects/app/src/web/core/dataset/api.ts b/projects/app/src/web/core/dataset/api.ts index 333a1b4015ca..4e909b6e4bcb 100644 --- a/projects/app/src/web/core/dataset/api.ts +++ b/projects/app/src/web/core/dataset/api.ts @@ -83,6 +83,7 @@ import type { DatasetCreateWithFilesBody, DatasetCreateWithFilesResponse } from '@/pages/api/core/dataset/createWithFiles'; +import type { PresignDatasetFileGetUrlParams } from '@fastgpt/global/core/dataset/v2/api'; /* ======================== dataset ======================= */ export const getDatasets = (data: GetDatasetListBody) => @@ -322,3 +323,6 @@ export const getApiDatasetCatalog = (data: GetApiDatasetCataLogProps) => export const getApiDatasetPaths = (data: GetApiDatasetPathBody) => POST('/core/dataset/apiDataset/getPathNames', data); + +export const getPresignedDatasetFileGetUrl = (data: PresignDatasetFileGetUrlParams) => + POST('/core/dataset/presignDatasetFileGetUrl', data); diff --git a/projects/app/src/web/core/dataset/hooks/readCollectionSource.ts b/projects/app/src/web/core/dataset/hooks/readCollectionSource.ts index 65b636e56cef..c2b11000442b 100644 --- a/projects/app/src/web/core/dataset/hooks/readCollectionSource.ts +++ b/projects/app/src/web/core/dataset/hooks/readCollectionSource.ts @@ -1,5 +1,5 @@ import { useSystemStore } from '@/web/common/system/useSystemStore'; -import { getCollectionSource } from '@/web/core/dataset/api'; +import { getPresignedDatasetFileGetUrl } from '@/web/core/dataset/api'; import { getErrText } from '@fastgpt/global/common/error/utils'; import { useToast } from '@fastgpt/web/hooks/useToast'; import { useTranslation } from 'next-i18next'; @@ -16,7 +16,7 @@ export function getCollectionSourceAndOpen( try { setLoading(true); - const { value: url } = await getCollectionSource(props); + const url = await getPresignedDatasetFileGetUrl({ collectionId: props.collectionId }); if (!url) { throw new Error('No file found');