Skip to content

Commit 06c0a64

Browse files
xqvvuc121914yu
andcommitted
feat: integrate S3 for dataset with compatibility (#5941)
* fix: text split * remove test * feat: integrate S3 for dataset with compatibility * fix: delay s3 files delete timing * fix: remove imageKeys * fix: remove parsed images' TTL * fix: improve codes by pr comments --------- Co-authored-by: archer <545436317@qq.com>
1 parent 05b9d1b commit 06c0a64

File tree

69 files changed

+1538
-427
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1538
-427
lines changed

packages/global/common/file/image/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export const FolderIcon = 'file/fill/folder';
44
export const FolderImgUrl = '/imgs/files/folder.svg';
55
export const HttpPluginImgUrl = '/imgs/app/httpPluginFill.svg';
66
export const HttpImgUrl = '/imgs/workflow/http.png';
7+
export const TempFileURL = '/api/file/temp';

packages/global/core/dataset/training/type.d.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ export type PushDataToTrainingQueueProps = {
99

1010
data: PushDatasetDataChunkProps[];
1111
mode?: TrainingModeEnum;
12-
data: PushDatasetDataChunkProps[];
1312

1413
agentModel: string;
1514
vectorModel: string;

packages/global/core/dataset/type.d.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export type DatasetCollectionSchemaType = ChunkSettingsType & {
118118

119119
rawTextLength?: number;
120120
hashRawText?: string;
121+
121122
metadata?: {
122123
webPageSelector?: string;
123124
relatedImgId?: string; // The id of the associated image collections
@@ -250,7 +251,10 @@ export type TagUsageType = {
250251
export type DatasetCollectionItemType = CollectionWithDatasetType & {
251252
sourceName: string;
252253
sourceId?: string;
253-
file?: DatasetFileSchema;
254+
file?: {
255+
filename?: string;
256+
contentLength?: number;
257+
};
254258
permission: DatasetPermission;
255259
indexAmount: number;
256260
errorCount?: number;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { ObjectIdSchema } from '../../../common/type/mongo';
2+
import z from 'zod';
3+
4+
export const PresignDatasetFileGetUrlSchema = z.union([
5+
z.object({
6+
key: z
7+
.string()
8+
.nonempty()
9+
.refine((key) => key.startsWith('dataset/'), {
10+
message: 'Invalid key format: must start with "dataset/"'
11+
})
12+
.transform((k) => decodeURIComponent(k)),
13+
preview: z.boolean().optional()
14+
}),
15+
z.object({
16+
collectionId: ObjectIdSchema
17+
// datasetId: ObjectIdSchema
18+
})
19+
]);
20+
export type PresignDatasetFileGetUrlParams = z.infer<typeof PresignDatasetFileGetUrlSchema>;
21+
22+
export const PresignDatasetFilePostUrlSchema = z.object({
23+
filename: z.string().min(1),
24+
datasetId: ObjectIdSchema
25+
});
26+
export type PresignDatasetFilePostUrlParams = z.infer<typeof PresignDatasetFilePostUrlSchema>;
27+
28+
export const ShortPreviewLinkSchema = z.object({
29+
k: z
30+
.string()
31+
.nonempty()
32+
.transform((k) => `chat:temp_file:${decodeURIComponent(k)}`)
33+
});
34+
export type ShortPreviewLinkParams = z.infer<typeof ShortPreviewLinkSchema>;

packages/service/common/file/gridfs/controller.ts

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@ import fsp from 'fs/promises';
44
import fs from 'fs';
55
import { type DatasetFileSchema } from '@fastgpt/global/core/dataset/type';
66
import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema';
7-
import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools';
8-
import { CommonErrEnum } from '@fastgpt/global/common/error/code/common';
9-
import { readRawContentByFileBuffer } from '../read/utils';
10-
import { computeGridFsChunSize, gridFsStream2Buffer, stream2Encoding } from './utils';
7+
import { detectFileEncodingByPath } from '@fastgpt/global/common/file/tools';
8+
import { computeGridFsChunSize, stream2Encoding } from './utils';
119
import { addLog } from '../../system/log';
12-
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
1310
import { Readable } from 'stream';
14-
import { addRawTextBuffer, getRawTextBuffer } from '../../buffer/rawText/controller';
15-
import { addMinutes } from 'date-fns';
1611
import { retryFn } from '@fastgpt/global/common/system/utils';
12+
import { getS3DatasetSource } from '../../s3/sources/dataset';
1713

1814
export function getGFSCollection(bucket: `${BucketNameEnum}`) {
1915
MongoDatasetFileSchema;
@@ -162,11 +158,17 @@ export async function delFileByFileIdList({
162158
fileIdList: string[];
163159
}): Promise<any> {
164160
return retryFn(async () => {
161+
const s3DatasetSource = getS3DatasetSource();
162+
165163
const bucket = getGridBucket(bucketName);
166164

167165
for await (const fileId of fileIdList) {
168166
try {
169-
await bucket.delete(new Types.ObjectId(String(fileId)));
167+
if (s3DatasetSource.isDatasetObjectKey(fileId)) {
168+
await s3DatasetSource.deleteDatasetFileByKey(fileId);
169+
} else {
170+
await bucket.delete(new Types.ObjectId(String(fileId)));
171+
}
170172
} catch (error: any) {
171173
if (typeof error?.message === 'string' && error.message.includes('File not found')) {
172174
addLog.warn('File not found', { fileId });
@@ -189,78 +191,3 @@ export async function getDownloadStream({
189191

190192
return bucket.openDownloadStream(new Types.ObjectId(fileId));
191193
}
192-
193-
export const readFileContentFromMongo = async ({
194-
teamId,
195-
tmbId,
196-
bucketName,
197-
fileId,
198-
customPdfParse = false,
199-
getFormatText,
200-
usageId
201-
}: {
202-
teamId: string;
203-
tmbId: string;
204-
bucketName: `${BucketNameEnum}`;
205-
fileId: string;
206-
customPdfParse?: boolean;
207-
getFormatText?: boolean; // 数据类型都尽可能转化成 markdown 格式
208-
usageId?: string;
209-
}): Promise<{
210-
rawText: string;
211-
filename: string;
212-
}> => {
213-
const bufferId = `${String(fileId)}-${customPdfParse}`;
214-
// read buffer
215-
const fileBuffer = await getRawTextBuffer(bufferId);
216-
if (fileBuffer) {
217-
return {
218-
rawText: fileBuffer.text,
219-
filename: fileBuffer?.sourceName
220-
};
221-
}
222-
223-
const [file, fileStream] = await Promise.all([
224-
getFileById({ bucketName, fileId }),
225-
getDownloadStream({ bucketName, fileId })
226-
]);
227-
if (!file) {
228-
return Promise.reject(CommonErrEnum.fileNotFound);
229-
}
230-
231-
const extension = parseFileExtensionFromUrl(file?.filename);
232-
233-
const start = Date.now();
234-
const fileBuffers = await gridFsStream2Buffer(fileStream);
235-
addLog.debug('get file buffer', { time: Date.now() - start });
236-
237-
const encoding = file?.metadata?.encoding || detectFileEncoding(fileBuffers);
238-
239-
// Get raw text
240-
const { rawText } = await readRawContentByFileBuffer({
241-
customPdfParse,
242-
usageId,
243-
getFormatText,
244-
extension,
245-
teamId,
246-
tmbId,
247-
buffer: fileBuffers,
248-
encoding,
249-
metadata: {
250-
relatedId: fileId
251-
}
252-
});
253-
254-
// Add buffer
255-
addRawTextBuffer({
256-
sourceId: bufferId,
257-
sourceName: file.filename,
258-
text: rawText,
259-
expiredTime: addMinutes(new Date(), 20)
260-
});
261-
262-
return {
263-
rawText,
264-
filename: file.filename
265-
};
266-
};

packages/service/common/file/image/controller.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,28 @@ export async function uploadMongoImg({
6464
export const copyAvatarImage = async ({
6565
teamId,
6666
imageUrl,
67-
ttl,
67+
temporary,
6868
session
6969
}: {
7070
teamId: string;
7171
imageUrl: string;
72-
ttl: boolean;
72+
temporary: boolean;
7373
session?: ClientSession;
7474
}) => {
7575
if (!imageUrl) return;
7676

77-
// S3
78-
if (imageUrl.startsWith(`${imageBaseUrl}/${S3Sources.avatar}`)) {
79-
const extendName = path.extname(imageUrl);
77+
const avatarSource = getS3AvatarSource();
78+
if (avatarSource.isAvatarKey(imageUrl)) {
79+
const filename = (() => {
80+
const last = imageUrl.split('/').pop()?.split('-')[1];
81+
if (!last) return getNanoid(6).concat(path.extname(imageUrl));
82+
return `${getNanoid(6)}-${last}`;
83+
})();
8084
const key = await getS3AvatarSource().copyAvatar({
81-
sourceKey: imageUrl.slice(imageBaseUrl.length),
82-
targetKey: `${S3Sources.avatar}/${teamId}/${getNanoid(6)}${extendName}`,
83-
ttl
85+
key: imageUrl,
86+
teamId,
87+
filename,
88+
temporary
8489
});
8590
return key;
8691
}
@@ -130,9 +135,13 @@ export const removeImageByPath = (path?: string, session?: ClientSession) => {
130135
if (!name) return;
131136

132137
const id = name.split('.')[0];
133-
if (!id || !Types.ObjectId.isValid(id)) return;
138+
if (!id) return;
134139

135-
return MongoImage.deleteOne({ _id: id }, { session });
140+
if (Types.ObjectId.isValid(id)) {
141+
return MongoImage.deleteOne({ _id: id }, { session });
142+
} else if (getS3AvatarSource().isAvatarKey(path)) {
143+
return getS3AvatarSource().deleteAvatar(path, session);
144+
}
136145
};
137146

138147
export async function readMongoImg({ id }: { id: string }) {

packages/service/common/file/read/utils.ts

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { uploadMongoImg } from '../image/controller';
21
import FormData from 'form-data';
32
import fs from 'fs';
43
import type { ReadFileResponse } from '../../../worker/readFile/type';
@@ -9,6 +8,9 @@ import { matchMdImg } from '@fastgpt/global/common/string/markdown';
98
import { createPdfParseUsage } from '../../../support/wallet/usage/controller';
109
import { useDoc2xServer } from '../../../thirdProvider/doc2x';
1110
import { readRawContentFromBuffer } from '../../../worker/function';
11+
import { uploadImage2S3Bucket } from '../../s3/utils';
12+
import { Mimes } from '../../s3/constants';
13+
import { addDays } from 'date-fns';
1214

1315
export type readRawTextByLocalFileParams = {
1416
teamId: string;
@@ -17,6 +19,7 @@ export type readRawTextByLocalFileParams = {
1719
encoding: string;
1820
customPdfParse?: boolean;
1921
getFormatText?: boolean;
22+
uploadKey: string;
2023
metadata?: Record<string, any>;
2124
};
2225
export const readRawTextByLocalFile = async (params: readRawTextByLocalFileParams) => {
@@ -26,41 +29,47 @@ export const readRawTextByLocalFile = async (params: readRawTextByLocalFileParam
2629

2730
const buffer = await fs.promises.readFile(path);
2831

29-
return readRawContentByFileBuffer({
32+
return readS3FileContentByBuffer({
3033
extension,
3134
customPdfParse: params.customPdfParse,
3235
getFormatText: params.getFormatText,
3336
teamId: params.teamId,
3437
tmbId: params.tmbId,
3538
encoding: params.encoding,
3639
buffer,
37-
metadata: params.metadata
40+
imageKeyOptions: {
41+
prefix: params.uploadKey,
42+
expiredTime: addDays(new Date(), 1)
43+
}
3844
});
3945
};
4046

41-
export const readRawContentByFileBuffer = async ({
47+
export const readS3FileContentByBuffer = async ({
4248
teamId,
4349
tmbId,
4450

4551
extension,
4652
buffer,
4753
encoding,
48-
metadata,
4954
customPdfParse = false,
5055
usageId,
51-
getFormatText = true
56+
getFormatText = true,
57+
imageKeyOptions
5258
}: {
5359
teamId: string;
5460
tmbId: string;
5561

5662
extension: string;
5763
buffer: Buffer;
5864
encoding: string;
59-
metadata?: Record<string, any>;
6065

6166
customPdfParse?: boolean;
6267
usageId?: string;
6368
getFormatText?: boolean;
69+
imageKeyOptions: {
70+
prefix: string;
71+
expiredTime?: Date;
72+
};
6473
}): Promise<{
6574
rawText: string;
6675
}> => {
@@ -158,31 +167,35 @@ export const readRawContentByFileBuffer = async ({
158167
addLog.debug(`Parse file success, time: ${Date.now() - start}ms. `);
159168

160169
// markdown data format
161-
if (imageList) {
170+
if (imageList && imageList.length > 0) {
171+
addLog.debug(`Processing ${imageList.length} images from parsed document`);
172+
162173
await batchRun(imageList, async (item) => {
163174
const src = await (async () => {
164175
try {
165-
return await uploadMongoImg({
176+
const { prefix, expiredTime } = imageKeyOptions;
177+
const ext = `.${item.mime.split('/')[1].replace('x-', '')}`;
178+
179+
return await uploadImage2S3Bucket('private', {
166180
base64Img: `data:${item.mime};base64,${item.base64}`,
167-
teamId,
168-
metadata: {
169-
...metadata,
170-
mime: item.mime
171-
}
181+
uploadKey: `${prefix}/${item.uuid}.${ext}`,
182+
mimetype: Mimes[ext as keyof typeof Mimes],
183+
filename: `${item.uuid}${ext}`,
184+
expiredTime
172185
});
173186
} catch (error) {
174-
addLog.warn('Upload file image error', { error });
175-
return 'Upload load image error';
187+
return `[Image Upload Failed: ${item.uuid}]`;
176188
}
177189
})();
178190
rawText = rawText.replace(item.uuid, src);
191+
// rawText = rawText.replace(item.uuid, jwtSignS3ObjectKey(src, addDays(new Date(), 90)));
179192
if (formatText) {
180193
formatText = formatText.replace(item.uuid, src);
181194
}
182195
});
183196
}
184197

185-
addLog.debug(`Upload file success, time: ${Date.now() - start}ms`);
186-
187-
return { rawText: getFormatText ? formatText || rawText : rawText };
198+
return {
199+
rawText: getFormatText ? formatText || rawText : rawText
200+
};
188201
};

0 commit comments

Comments
 (0)