From 4904a18e01f8476151c123c398fb910162796758 Mon Sep 17 00:00:00 2001 From: Ping-Lin Chang Date: Wed, 6 Aug 2025 02:54:22 +0100 Subject: [PATCH 1/2] refactor(minio): adopt x/minio pacakge --- cmd/main/main.go | 52 ++-- cmd/worker/main.go | 53 ++-- config/config.go | 34 +-- config/config.yaml | 2 +- pkg/component/base/component.go | 5 +- .../audio/v0/task_detect_activity_test.go | 4 +- .../operator/audio/v0/task_segment_test.go | 4 +- pkg/component/store/store.go | 20 +- pkg/data/audio.go | 4 +- pkg/data/audio_test.go | 4 +- pkg/data/binary/fetcher.go | 79 ++++++ pkg/data/document.go | 4 +- pkg/data/document_test.go | 4 +- pkg/data/file.go | 4 +- pkg/data/image.go | 4 +- pkg/data/image_test.go | 5 +- pkg/data/number_test.go | 1 + pkg/data/struct.go | 6 +- pkg/data/struct_test.go | 4 +- pkg/data/utils.go | 5 +- pkg/data/video.go | 5 +- pkg/data/video_test.go | 4 +- pkg/external/external.go | 195 --------------- pkg/repository/repository_test.go | 4 +- pkg/service/blobstorage.go | 119 --------- pkg/service/main.go | 12 +- pkg/service/pipeline.go | 81 ++++++- pkg/service/pipeline_test.go | 18 +- pkg/utils/blobstorage.go | 227 ------------------ pkg/worker/blobstorage.go | 95 -------- pkg/worker/io.go | 7 +- pkg/worker/main.go | 13 +- pkg/worker/minioactivity.go | 162 ++++++++++--- pkg/worker/utils.go | 80 ++++-- pkg/worker/workflow.go | 77 ++++++ 35 files changed, 555 insertions(+), 842 deletions(-) create mode 100644 pkg/data/binary/fetcher.go delete mode 100644 pkg/external/external.go delete mode 100644 pkg/service/blobstorage.go delete mode 100644 pkg/utils/blobstorage.go delete mode 100644 pkg/worker/blobstorage.go diff --git a/cmd/main/main.go b/cmd/main/main.go index d372f0c87..3de9be3e2 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -30,7 +30,6 @@ import ( "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/acl" - "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/handler" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/middleware" @@ -38,7 +37,6 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/service" "github.com/instill-ai/pipeline-backend/pkg/usage" - "github.com/instill-ai/x/temporal" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" database "github.com/instill-ai/pipeline-backend/pkg/db" @@ -53,6 +51,7 @@ import ( otelx "github.com/instill-ai/x/otel" servergrpcx "github.com/instill-ai/x/server/grpc" gatewayx "github.com/instill-ai/x/server/grpc/gateway" + temporalx "github.com/instill-ai/x/temporal" ) const gracefulShutdownWaitPeriod = 15 * time.Second @@ -120,16 +119,28 @@ func main() { // Initialize all clients pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db, - minIOClient, minIOFileGetter, aclClient, temporalClient, closeClients := newClients(ctx, logger) + aclClient, temporalClient, closeClients := newClients(ctx, logger) defer closeClients() - // Keep NewArtifactBinaryFetcher as requested - binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter) + // Initialize MinIO client + minIOParams := miniox.ClientParams{ + Config: config.Config.Minio, + Logger: logger, + ExpiryRules: service.NewRetentionHandler().ListExpiryRules(), + AppInfo: miniox.AppInfo{ + Name: serviceName, + Version: serviceVersion, + }, + } + + minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams) + if err != nil { + logger.Fatal("failed to create MinIO client", zap.Error(err)) + } compStore := componentstore.Init(componentstore.InitParams{ Logger: logger, Secrets: config.Config.Component.Secrets, - BinaryFetcher: binaryFetcher, TemporalClient: temporalClient, }) @@ -156,7 +167,7 @@ func main() { compStore, ms, service.NewRetentionHandler(), - binaryFetcher, + compStore.GetBinaryFetcher(), artifactPublicServiceClient, artifactPrivateServiceClient, ) @@ -368,8 +379,6 @@ func newClients(ctx context.Context, logger *zap.Logger) ( artifactpb.ArtifactPrivateServiceClient, *redis.Client, *gorm.DB, - miniox.Client, - *miniox.FileGetter, acl.ACLClient, temporalclient.Client, func(), @@ -437,7 +446,7 @@ func newClients(ctx context.Context, logger *zap.Logger) ( } // Initialize Temporal client - temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger) + temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger) if err != nil { logger.Fatal("Unable to build Temporal client options", zap.Error(err)) } @@ -484,27 +493,6 @@ func newClients(ctx context.Context, logger *zap.Logger) ( aclClient := acl.NewACLClient(fgaClient, fgaReplicaClient, redisClient) - // Initialize MinIO client - minIOParams := miniox.ClientParams{ - Config: config.Config.Minio, - Logger: logger, - AppInfo: miniox.AppInfo{ - Name: serviceName, - Version: serviceVersion, - }, - } - minIOFileGetter, err := miniox.NewFileGetter(minIOParams) - if err != nil { - logger.Fatal("Failed to create MinIO file getter", zap.Error(err)) - } - - retentionHandler := service.NewRetentionHandler() - minIOParams.ExpiryRules = retentionHandler.ListExpiryRules() - minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams) - if err != nil { - logger.Fatal("failed to create MinIO client", zap.Error(err)) - } - closer := func() { for conn, fn := range closeFuncs { if err := fn(); err != nil { @@ -515,5 +503,5 @@ func newClients(ctx context.Context, logger *zap.Logger) ( return pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db, - minIOClient, minIOFileGetter, aclClient, temporalClient, closer + aclClient, temporalClient, closer } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index a7e69b606..ddf7d447f 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -21,14 +21,11 @@ import ( temporalclient "go.temporal.io/sdk/client" "github.com/instill-ai/pipeline-backend/config" - "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/pubsub" "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/service" "github.com/instill-ai/x/client" - "github.com/instill-ai/x/minio" - "github.com/instill-ai/x/temporal" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" database "github.com/instill-ai/pipeline-backend/pkg/db" @@ -37,7 +34,9 @@ import ( pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" clientgrpcx "github.com/instill-ai/x/client/grpc" logx "github.com/instill-ai/x/log" + miniox "github.com/instill-ai/x/minio" otelx "github.com/instill-ai/x/otel" + temporalx "github.com/instill-ai/x/temporal" ) const gracefulShutdownWaitPeriod = 15 * time.Second @@ -83,16 +82,27 @@ func main() { // Initialize all clients pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient, - redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closeClients := newClients(ctx, logger) + redisClient, db, temporalClient, timeseries, closeClients := newClients(ctx, logger) defer closeClients() - // Keep NewArtifactBinaryFetcher as requested - binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter) + minIOParams := miniox.ClientParams{ + Config: config.Config.Minio, + Logger: logger, + ExpiryRules: service.NewRetentionHandler().ListExpiryRules(), + AppInfo: miniox.AppInfo{ + Name: serviceName, + Version: serviceVersion, + }, + } + + minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams) + if err != nil { + logger.Fatal("failed to create MinIO client", zap.Error(err)) + } compStore := componentstore.Init(componentstore.InitParams{ Logger: logger, Secrets: config.Config.Component.Secrets, - BinaryFetcher: binaryFetcher, TemporalClient: temporalClient, }) @@ -111,7 +121,7 @@ func main() { MemoryStore: ms, ArtifactPublicServiceClient: artifactPublicServiceClient, ArtifactPrivateServiceClient: artifactPrivateServiceClient, - BinaryFetcher: binaryFetcher, + BinaryFetcher: compStore.GetBinaryFetcher(), PipelinePublicServiceClient: pipelinePublicServiceClient, }, ) @@ -195,8 +205,6 @@ func newClients(ctx context.Context, logger *zap.Logger) ( artifactpb.ArtifactPrivateServiceClient, *redis.Client, *gorm.DB, - minio.Client, - *minio.FileGetter, temporalclient.Client, *repository.InfluxDB, func(), @@ -257,7 +265,7 @@ func newClients(ctx context.Context, logger *zap.Logger) ( } // Initialize Temporal client - temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger) + temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger) if err != nil { logger.Fatal("Unable to build Temporal client options", zap.Error(err)) } @@ -283,27 +291,6 @@ func newClients(ctx context.Context, logger *zap.Logger) ( return nil } - // Initialize MinIO client - minIOParams := minio.ClientParams{ - Config: config.Config.Minio, - Logger: logger, - AppInfo: minio.AppInfo{ - Name: serviceName, - Version: serviceVersion, - }, - } - minIOFileGetter, err := minio.NewFileGetter(minIOParams) - if err != nil { - logger.Fatal("Failed to create MinIO file getter", zap.Error(err)) - } - - retentionHandler := service.NewRetentionHandler() - minIOParams.ExpiryRules = retentionHandler.ListExpiryRules() - minIOClient, err := minio.NewMinIOClientAndInitBucket(ctx, minIOParams) - if err != nil { - logger.Fatal("failed to create MinIO client", zap.Error(err)) - } - closer := func() { for conn, fn := range closeFuncs { if err := fn(); err != nil { @@ -313,5 +300,5 @@ func newClients(ctx context.Context, logger *zap.Logger) ( } return pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient, - redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closer + redisClient, db, temporalClient, timeseries, closer } diff --git a/config/config.go b/config/config.go index 9f821eec0..168d47a0a 100644 --- a/config/config.go +++ b/config/config.go @@ -14,9 +14,9 @@ import ( "github.com/knadh/koanf/providers/file" "github.com/redis/go-redis/v9" - "github.com/instill-ai/x/client" - "github.com/instill-ai/x/minio" - "github.com/instill-ai/x/temporal" + clientx "github.com/instill-ai/x/client" + miniox "github.com/instill-ai/x/minio" + temporalx "github.com/instill-ai/x/temporal" ) const ( @@ -31,20 +31,20 @@ var Config AppConfig // AppConfig defines type AppConfig struct { - Server ServerConfig `koanf:"server"` - Component ComponentConfig `koanf:"component"` - Database DatabaseConfig `koanf:"database"` - InfluxDB InfluxDBConfig `koanf:"influxdb"` - Temporal temporal.ClientConfig `koanf:"temporal"` - Cache CacheConfig `koanf:"cache"` - OTELCollector OTELCollectorConfig `koanf:"otelcollector"` - MgmtBackend client.ServiceConfig `koanf:"mgmtbackend"` - ModelBackend client.ServiceConfig `koanf:"modelbackend"` - OpenFGA OpenFGAConfig `koanf:"openfga"` - ArtifactBackend client.ServiceConfig `koanf:"artifactbackend"` - Minio minio.Config `koanf:"minio"` - AgentBackend client.ServiceConfig `koanf:"agentbackend"` - APIGateway APIGatewayConfig `koanf:"apigateway"` + Server ServerConfig `koanf:"server"` + Component ComponentConfig `koanf:"component"` + Database DatabaseConfig `koanf:"database"` + InfluxDB InfluxDBConfig `koanf:"influxdb"` + Temporal temporalx.ClientConfig `koanf:"temporal"` + Cache CacheConfig `koanf:"cache"` + OTELCollector OTELCollectorConfig `koanf:"otelcollector"` + MgmtBackend clientx.ServiceConfig `koanf:"mgmtbackend"` + ModelBackend clientx.ServiceConfig `koanf:"modelbackend"` + OpenFGA OpenFGAConfig `koanf:"openfga"` + ArtifactBackend clientx.ServiceConfig `koanf:"artifactbackend"` + Minio miniox.Config `koanf:"minio"` + AgentBackend clientx.ServiceConfig `koanf:"agentbackend"` + APIGateway APIGatewayConfig `koanf:"apigateway"` } // APIGatewayConfig related to API gateway diff --git a/config/config.yaml b/config/config.yaml index 4099edc7f..011292efd 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -81,7 +81,7 @@ minio: port: 9000 user: minioadmin password: minioadmin - bucketname: instill-ai-vdp + bucketname: core-pipeline secure: false agentbackend: host: agent-backend diff --git a/pkg/component/base/component.go b/pkg/component/base/component.go index 50b9cee95..1238bcebe 100644 --- a/pkg/component/base/component.go +++ b/pkg/component/base/component.go @@ -20,8 +20,8 @@ import ( temporalclient "go.temporal.io/sdk/client" "github.com/instill-ai/pipeline-backend/pkg/component/internal/jsonref" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/external" pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" ) @@ -161,6 +161,7 @@ type IdentifierResult struct { } // Component implements the common component methods. + type Component struct { Logger *zap.Logger NewUsageHandler UsageHandlerCreator @@ -168,7 +169,7 @@ type Component struct { definition *pipelinepb.ComponentDefinition secretFields []string - BinaryFetcher external.BinaryFetcher + BinaryFetcher binary.Fetcher TemporalClient temporalclient.Client } diff --git a/pkg/component/operator/audio/v0/task_detect_activity_test.go b/pkg/component/operator/audio/v0/task_detect_activity_test.go index ec64f29c1..00df6efbc 100644 --- a/pkg/component/operator/audio/v0/task_detect_activity_test.go +++ b/pkg/component/operator/audio/v0/task_detect_activity_test.go @@ -19,7 +19,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/data" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestDetectActivity(t *testing.T) { @@ -125,7 +125,7 @@ func TestDetectActivity(t *testing.T) { jsonValue, err := data.NewJSONValue(segmentsMap) c.Assert(err, qt.IsNil) - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() unmarshaler := data.NewUnmarshaler(binaryFetcher) c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &expectedSegmentsStruct), qt.IsNil) expectedSegments := expectedSegmentsStruct.Segments diff --git a/pkg/component/operator/audio/v0/task_segment_test.go b/pkg/component/operator/audio/v0/task_segment_test.go index 0e3d293b2..136343a7b 100644 --- a/pkg/component/operator/audio/v0/task_segment_test.go +++ b/pkg/component/operator/audio/v0/task_segment_test.go @@ -12,7 +12,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/base" "github.com/instill-ai/pipeline-backend/pkg/component/internal/mock" "github.com/instill-ai/pipeline-backend/pkg/data" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestSegment(t *testing.T) { @@ -68,7 +68,7 @@ func TestSegment(t *testing.T) { jsonValue, err := data.NewJSONValue(segmentsMap) c.Assert(err, qt.IsNil) - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() unmarshaler := data.NewUnmarshaler(binaryFetcher) c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &segmentsStruct), qt.IsNil) segments := segmentsStruct.Segments diff --git a/pkg/component/store/store.go b/pkg/component/store/store.go index 708b17af9..61ab152a5 100644 --- a/pkg/component/store/store.go +++ b/pkg/component/store/store.go @@ -63,7 +63,7 @@ import ( "github.com/instill-ai/pipeline-backend/pkg/component/operator/text/v0" "github.com/instill-ai/pipeline-backend/pkg/component/operator/video/v0" "github.com/instill-ai/pipeline-backend/pkg/component/operator/web/v0" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" ) @@ -78,27 +78,31 @@ type Store struct { componentUIDs []uuid.UUID componentUIDMap map[uuid.UUID]*component componentIDMap map[string]*component + binaryFetcher binary.Fetcher } type component struct { comp base.IComponent } +// InitParams contains the parameters for initializing the component store. type InitParams struct { Logger *zap.Logger Secrets config.ComponentSecrets UsageHandlerCreator base.UsageHandlerCreator - BinaryFetcher external.BinaryFetcher TemporalClient temporalclient.Client } // Init initializes the components implemented in this repository and loads // their information to memory. func Init(param InitParams) *Store { + // Create the binary fetcher with the provided dependencies + binaryFetcher := binary.NewFetcher() + baseComp := base.Component{ Logger: param.Logger, NewUsageHandler: param.UsageHandlerCreator, - BinaryFetcher: param.BinaryFetcher, + BinaryFetcher: binaryFetcher, TemporalClient: param.TemporalClient, } secrets := param.Secrets @@ -107,6 +111,7 @@ func Init(param InitParams) *Store { compStore = &Store{ componentUIDMap: map[uuid.UUID]*component{}, componentIDMap: map[string]*component{}, + binaryFetcher: binaryFetcher, } compStore.Import(base64.Init(baseComp)) compStore.Import(json.Init(baseComp)) @@ -244,6 +249,11 @@ func Init(param InitParams) *Store { return compStore } +// GetBinaryFetcher returns the binary fetcher instance used by the store +func (s *Store) GetBinaryFetcher() binary.Fetcher { + return s.binaryFetcher +} + // Import loads the component definitions into memory. func (s *Store) Import(comp base.IComponent) { c := &component{comp: comp} @@ -294,6 +304,7 @@ func (s *Store) CreateExecution(p ExecutionParams) (*base.ExecutionWrapper, erro return &base.ExecutionWrapper{IExecution: x}, nil } +// IdentifyEvent identifies the event and returns the identifiers. func (s *Store) IdentifyEvent(ctx context.Context, defID string, rawEvent *base.RawEvent) (identifierResult *base.IdentifierResult, err error) { if c, ok := s.componentIDMap[defID]; ok { return c.comp.IdentifyEvent(ctx, rawEvent) @@ -301,6 +312,7 @@ func (s *Store) IdentifyEvent(ctx context.Context, defID string, rawEvent *base. return nil, fmt.Errorf("component definition not found") } +// ParseEvent parses the raw event and returns a parsed event. func (s *Store) ParseEvent(ctx context.Context, defID string, rawEvent *base.RawEvent) (parsedEvent *base.ParsedEvent, err error) { if c, ok := s.componentIDMap[defID]; ok { return c.comp.ParseEvent(ctx, rawEvent) @@ -308,6 +320,7 @@ func (s *Store) ParseEvent(ctx context.Context, defID string, rawEvent *base.Raw return nil, fmt.Errorf("component definition not found") } +// RegisterEvent registers an event handler for the component. func (s *Store) RegisterEvent(ctx context.Context, defID string, settings *base.RegisterEventSettings) (identifiers []base.Identifier, err error) { if c, ok := s.componentIDMap[defID]; ok { return c.comp.RegisterEvent(ctx, settings) @@ -315,6 +328,7 @@ func (s *Store) RegisterEvent(ctx context.Context, defID string, settings *base. return nil, fmt.Errorf("component definition not found") } +// UnregisterEvent unregisters an event handler for the component. func (s *Store) UnregisterEvent(ctx context.Context, defID string, settings *base.UnregisterEventSettings, identifiers []base.Identifier) error { if c, ok := s.componentIDMap[defID]; ok { return c.comp.UnregisterEvent(ctx, settings, identifiers) diff --git a/pkg/data/audio.go b/pkg/data/audio.go index 1d58929c4..6e2a74f76 100644 --- a/pkg/data/audio.go +++ b/pkg/data/audio.go @@ -10,9 +10,9 @@ import ( "strconv" "time" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/data/path" - "github.com/instill-ai/pipeline-backend/pkg/external" ) type audioData struct { @@ -52,7 +52,7 @@ func NewAudioFromBytes(b []byte, contentType, filename string) (*audioData, erro return createAudioData(b, contentType, filename) } -func NewAudioFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (video *audioData, err error) { +func NewAudioFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (video *audioData, err error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/audio_test.go b/pkg/data/audio_test.go index a079e37a7..4d180f608 100644 --- a/pkg/data/audio_test.go +++ b/pkg/data/audio_test.go @@ -9,7 +9,7 @@ import ( qt "github.com/frankban/quicktest" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestNewAudioFromBytes(t *testing.T) { @@ -59,7 +59,7 @@ func TestNewAudioFromURL(t *testing.T) { c := qt.New(t) ctx := context.Background() - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() testCases := []struct { name string url string diff --git a/pkg/data/binary/fetcher.go b/pkg/data/binary/fetcher.go new file mode 100644 index 000000000..e229c48fe --- /dev/null +++ b/pkg/data/binary/fetcher.go @@ -0,0 +1,79 @@ +package binary + +import ( + "context" + "encoding/base64" + "mime" + "strings" + + "github.com/gabriel-vasile/mimetype" + "github.com/go-resty/resty/v2" +) + +// Fetcher is an interface that fetches binary data from a URL. +type Fetcher interface { + FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) +} + +type fetcher struct { + httpClient *resty.Client +} + +// NewFetcher creates a new BinaryFetcher instance. +func NewFetcher() Fetcher { + return &fetcher{ + httpClient: resty.New().SetRetryCount(3), + } +} + +// FetchFromURL fetches binary data from a URL. +func (f *fetcher) FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) { + if strings.HasPrefix(url, "data:") { + return f.convertDataURIToBytes(url) + } + + var resp *resty.Response + resp, err = f.httpClient.R().SetContext(ctx).Get(url) + if err != nil { + return + } + + body = resp.Body() + contentType = strings.Split(mimetype.Detect(body).String(), ";")[0] + + if disposition := resp.Header().Get("Content-Disposition"); disposition == "" { + if strings.HasPrefix(disposition, "attachment") { + if _, params, err := mime.ParseMediaType(disposition); err == nil { + filename = params["filename"] + } + } + } + + return +} + +func (f *fetcher) convertDataURIToBytes(url string) (b []byte, contentType string, filename string, err error) { + slices := strings.Split(url, ",") + if len(slices) == 1 { + b, err = base64.StdEncoding.DecodeString(url) + if err != nil { + return + } + contentType = strings.Split(mimetype.Detect(b).String(), ";")[0] + } else { + mime := strings.Split(slices[0], ":") + tags := "" + contentType, tags, _ = strings.Cut(mime[1], ";") + b, err = base64.StdEncoding.DecodeString(slices[1]) + if err != nil { + return + } + for _, tag := range strings.Split(tags, ";") { + key, value, _ := strings.Cut(tag, "=") + if key == "filename" || key == "fileName" || key == "file-name" { + filename = value + } + } + } + return b, contentType, filename, nil +} diff --git a/pkg/data/document.go b/pkg/data/document.go index bcc5dfc52..d6fa597aa 100644 --- a/pkg/data/document.go +++ b/pkg/data/document.go @@ -7,9 +7,9 @@ import ( "strings" "github.com/instill-ai/pipeline-backend/pkg/component/operator/document/v0/transformer" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/data/path" - "github.com/instill-ai/pipeline-backend/pkg/external" ) type documentData struct { @@ -41,7 +41,7 @@ func NewDocumentFromBytes(b []byte, contentType, filename string) (*documentData return createDocumentData(b, contentType, filename) } -func NewDocumentFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (*documentData, error) { +func NewDocumentFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (*documentData, error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/document_test.go b/pkg/data/document_test.go index d3fff6260..194cbd8ec 100644 --- a/pkg/data/document_test.go +++ b/pkg/data/document_test.go @@ -7,7 +7,7 @@ import ( qt "github.com/frankban/quicktest" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestNewDocumentFromBytes(t *testing.T) { @@ -53,7 +53,7 @@ func TestNewDocumentFromURL(t *testing.T) { c.Parallel() ctx := context.Background() - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() test := func(name, url string, hasErr bool) { c.Run(name, func(c *qt.C) { diff --git a/pkg/data/file.go b/pkg/data/file.go index 4c4c402b4..105fa78eb 100644 --- a/pkg/data/file.go +++ b/pkg/data/file.go @@ -12,9 +12,9 @@ import ( "github.com/gofrs/uuid" "google.golang.org/protobuf/types/known/structpb" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/data/path" - "github.com/instill-ai/pipeline-backend/pkg/external" ) const ( @@ -59,7 +59,7 @@ func NewFileFromBytes(b []byte, contentType, filename string) (bin *fileData, er return f, nil } -func NewFileFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (bin *fileData, err error) { +func NewFileFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (bin *fileData, err error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/image.go b/pkg/data/image.go index a2beb30b0..6df094d25 100644 --- a/pkg/data/image.go +++ b/pkg/data/image.go @@ -15,9 +15,9 @@ import ( goimage "image" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/data/path" - "github.com/instill-ai/pipeline-backend/pkg/external" ) type imageData struct { @@ -54,7 +54,7 @@ func NewImageFromBytes(b []byte, contentType, filename string) (*imageData, erro } // NewImageFromURL creates a new imageData from a URL -func NewImageFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (*imageData, error) { +func NewImageFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (*imageData, error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/image_test.go b/pkg/data/image_test.go index b7fe0ca6c..5f5425124 100644 --- a/pkg/data/image_test.go +++ b/pkg/data/image_test.go @@ -6,8 +6,7 @@ import ( "testing" qt "github.com/frankban/quicktest" - - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestNewImageFromBytes(t *testing.T) { @@ -58,7 +57,7 @@ func TestNewImageFromURL(t *testing.T) { c := qt.New(t) ctx := context.Background() - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() testCases := []struct { name string url string diff --git a/pkg/data/number_test.go b/pkg/data/number_test.go index c4e1fed1f..b4d17ff33 100644 --- a/pkg/data/number_test.go +++ b/pkg/data/number_test.go @@ -4,6 +4,7 @@ import ( "testing" qt "github.com/frankban/quicktest" + "github.com/instill-ai/pipeline-backend/pkg/data/format" ) diff --git a/pkg/data/struct.go b/pkg/data/struct.go index e7a410160..953328f82 100644 --- a/pkg/data/struct.go +++ b/pkg/data/struct.go @@ -8,8 +8,8 @@ import ( "strconv" "strings" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/external" ) // Package data provides functionality for marshaling and unmarshaling between @@ -49,7 +49,7 @@ type Marshaler struct { // Unmarshaler is used to unmarshal data into a struct. type Unmarshaler struct { - binaryFetcher external.BinaryFetcher + binaryFetcher binary.Fetcher } // NewMarshaler creates a new Marshaler. @@ -58,7 +58,7 @@ func NewMarshaler() *Marshaler { } // NewUnmarshaler creates a new Unmarshaler with a binary fetcher. -func NewUnmarshaler(binaryFetcher external.BinaryFetcher) *Unmarshaler { +func NewUnmarshaler(binaryFetcher binary.Fetcher) *Unmarshaler { return &Unmarshaler{binaryFetcher} } diff --git a/pkg/data/struct_test.go b/pkg/data/struct_test.go index 5f9f92035..2e2c25817 100644 --- a/pkg/data/struct_test.go +++ b/pkg/data/struct_test.go @@ -7,15 +7,15 @@ import ( qt "github.com/frankban/quicktest" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/external" ) func TestUnmarshal(t *testing.T) { t.Parallel() c := qt.New(t) - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() unmarshaler := NewUnmarshaler(binaryFetcher) c.Run("Basic types", func(c *qt.C) { diff --git a/pkg/data/utils.go b/pkg/data/utils.go index 4a6c838d0..658d893ad 100644 --- a/pkg/data/utils.go +++ b/pkg/data/utils.go @@ -6,10 +6,11 @@ import ( "fmt" "strings" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" + "github.com/gabriel-vasile/mimetype" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/external" ) func encodeDataURI(b []byte, contentType string) (s string, err error) { @@ -51,7 +52,7 @@ func NewBinaryFromBytes(b []byte, contentType, filename string) (format.Value, e } } -func NewBinaryFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (format.Value, error) { +func NewBinaryFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (format.Value, error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/video.go b/pkg/data/video.go index 583966c00..029bd9dd1 100644 --- a/pkg/data/video.go +++ b/pkg/data/video.go @@ -11,9 +11,10 @@ import ( "strings" "time" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" + "github.com/instill-ai/pipeline-backend/pkg/data/format" "github.com/instill-ai/pipeline-backend/pkg/data/path" - "github.com/instill-ai/pipeline-backend/pkg/external" ) type videoData struct { @@ -54,7 +55,7 @@ func NewVideoFromBytes(b []byte, contentType, filename string) (video *videoData return createVideoData(b, contentType, filename) } -func NewVideoFromURL(ctx context.Context, binaryFetcher external.BinaryFetcher, url string) (video *videoData, err error) { +func NewVideoFromURL(ctx context.Context, binaryFetcher binary.Fetcher, url string) (video *videoData, err error) { b, contentType, filename, err := binaryFetcher.FetchFromURL(ctx, url) if err != nil { return nil, err diff --git a/pkg/data/video_test.go b/pkg/data/video_test.go index 604802e79..3be7e04d6 100644 --- a/pkg/data/video_test.go +++ b/pkg/data/video_test.go @@ -10,7 +10,7 @@ import ( qt "github.com/frankban/quicktest" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" ) func TestNewVideoFromBytes(t *testing.T) { @@ -71,7 +71,7 @@ func TestNewVideoFromURL(t *testing.T) { c.Parallel() ctx := context.Background() - binaryFetcher := external.NewBinaryFetcher() + binaryFetcher := binary.NewFetcher() c.Run("Valid video URL", func(c *qt.C) { c.Parallel() diff --git a/pkg/external/external.go b/pkg/external/external.go deleted file mode 100644 index b82c27f72..000000000 --- a/pkg/external/external.go +++ /dev/null @@ -1,195 +0,0 @@ -package external - -import ( - "context" - "encoding/base64" - "fmt" - "mime" - "net/url" - "regexp" - "strings" - - "github.com/gabriel-vasile/mimetype" - "github.com/go-resty/resty/v2" - "github.com/gofrs/uuid" - - "github.com/instill-ai/x/minio" - "github.com/instill-ai/x/resource" - - artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" -) - -// BinaryFetcher is an interface that fetches binary data from a URL. -type BinaryFetcher interface { - FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) -} - -type binaryFetcher struct { - httpClient *resty.Client -} - -// NewBinaryFetcher creates a new BinaryFetcher instance. -func NewBinaryFetcher() BinaryFetcher { - return &binaryFetcher{ - httpClient: resty.New().SetRetryCount(3), - } -} - -// FetchFromURL fetches binary data from a URL. -func (f *binaryFetcher) FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) { - - if strings.HasPrefix(url, "data:") { - return f.convertDataURIToBytes(url) - } - - var resp *resty.Response - resp, err = f.httpClient.R().SetContext(ctx).Get(url) - if err != nil { - return - } - - body = resp.Body() - contentType = strings.Split(mimetype.Detect(body).String(), ";")[0] - - if disposition := resp.Header().Get("Content-Disposition"); disposition == "" { - if strings.HasPrefix(disposition, "attachment") { - if _, params, err := mime.ParseMediaType(disposition); err == nil { - filename = params["filename"] - } - } - } - - return -} - -// Pattern matches: https://{domain}/v1alpha/namespaces/{namespace}/blob-urls/{uid} -// This is a deprecated pattern, we should use the presigned pattern instead. -var minioURLPattern = regexp.MustCompile(`https?://[^/]+/v1alpha/namespaces/[^/]+/blob-urls/([^/]+)$`) - -// Pattern matches: https://{domain}/v1alpha/blob-urls/{encoded_presigned_url} -// This is the new pattern, we should use this instead of the deprecated pattern. -// The new design totally rely on the presigned URL provided by MinIO, without the need to get object URL from table. -var minioURLPresignedPattern = regexp.MustCompile(`https?://[^/]+/v1alpha/blob-urls/([^/]+)$`) - -// ArtifactBinaryFetcher fetches binary data from a URL. -// If that URL comes from an object uploaded on Instill Artifact, -// it uses the blob storage client directly to avoid egress costs. -type artifactBinaryFetcher struct { - binaryFetcher *binaryFetcher - artifactClient artifactpb.ArtifactPrivateServiceClient // By having this injected, main.go is responsible of closing the connection. - fileGetter *minio.FileGetter -} - -// NewArtifactBinaryFetcher creates a new ArtifactBinaryFetcher instance. -func NewArtifactBinaryFetcher(ac artifactpb.ArtifactPrivateServiceClient, fg *minio.FileGetter) BinaryFetcher { - return &artifactBinaryFetcher{ - binaryFetcher: &binaryFetcher{ - httpClient: resty.New().SetRetryCount(3), - }, - artifactClient: ac, - fileGetter: fg, - } -} - -func (f *artifactBinaryFetcher) FetchFromURL(ctx context.Context, fileURL string) (b []byte, contentType string, filename string, err error) { - if strings.HasPrefix(fileURL, "data:") { - return f.binaryFetcher.convertDataURIToBytes(fileURL) - } - if matches := minioURLPattern.FindStringSubmatch(fileURL); matches != nil { - if len(matches) < 2 { - err = fmt.Errorf("invalid blob storage url: %s", fileURL) - return - } - - return f.fetchFromBlobStorage(ctx, uuid.FromStringOrNil(matches[1])) - } - if matches := minioURLPresignedPattern.FindStringSubmatch(fileURL); matches != nil { - if len(matches) < 1 { - err = fmt.Errorf("invalid blob storage url: %s", fileURL) - return - } - parsedURL, err := url.Parse(fileURL) - if err != nil { - return nil, "", "", err - } - // The presigned URL is encoded in the format: - // scheme://host/v1alpha/blob-urls/base64_encoded_presigned_url - // Here we decode the base64 string to the presigned URL. - base64Decoded, err := base64.URLEncoding.DecodeString(strings.Split(parsedURL.Path, "/")[3]) - if err != nil { - return nil, "", "", err - } - - // the decoded presigned URL is a self-contained URL that can be used - // to upload or download the object directly. - return f.binaryFetcher.FetchFromURL(ctx, string(base64Decoded)) - } - return f.binaryFetcher.FetchFromURL(ctx, fileURL) -} - -func (f *artifactBinaryFetcher) fetchFromBlobStorage(ctx context.Context, urlUID uuid.UUID) (b []byte, contentType string, filename string, err error) { - objectURLRes, err := f.artifactClient.GetObjectURL(ctx, &artifactpb.GetObjectURLRequest{ - Uid: urlUID.String(), - }) - if err != nil { - return nil, "", "", err - } - - objectUID := objectURLRes.ObjectUrl.ObjectUid - - objectRes, err := f.artifactClient.GetObject(ctx, &artifactpb.GetObjectRequest{ - Uid: objectUID, - }) - - if err != nil { - return nil, "", "", err - } - - // TODO: we have agreed on to add the bucket name in pipelinepb.Object - // After the contract is updated, we have to replace it - bucketName := "instill-ai-blob" - objectPath := *objectRes.Object.Path - - // TODO this won't always produce a valid user UID (e.g. the jobs in the - // worker don't have this in the context). - // If we want a full audit of the MinIO actions (or if we want to check - // object permissions), we need to update the signature to pass the user - // UID explicitly. - _, userUID := resource.GetRequesterUIDAndUserUID(ctx) - b, _, err = f.fileGetter.GetFile(ctx, minio.GetFileParams{ - BucketName: bucketName, - Path: objectPath, - UserUID: userUID, - }) - if err != nil { - return nil, "", "", err - } - contentType = strings.Split(mimetype.Detect(b).String(), ";")[0] - return b, contentType, objectRes.Object.Name, nil -} - -func (f *binaryFetcher) convertDataURIToBytes(url string) (b []byte, contentType string, filename string, err error) { - slices := strings.Split(url, ",") - if len(slices) == 1 { - b, err = base64.StdEncoding.DecodeString(url) - if err != nil { - return - } - contentType = strings.Split(mimetype.Detect(b).String(), ";")[0] - } else { - mime := strings.Split(slices[0], ":") - tags := "" - contentType, tags, _ = strings.Cut(mime[1], ";") - b, err = base64.StdEncoding.DecodeString(slices[1]) - if err != nil { - return - } - for _, tag := range strings.Split(tags, ";") { - key, value, _ := strings.Cut(tag, "=") - if key == "filename" || key == "fileName" || key == "file-name" { - filename = value - } - } - } - return b, contentType, filename, nil -} diff --git a/pkg/repository/repository_test.go b/pkg/repository/repository_test.go index f571409ec..0333e2238 100644 --- a/pkg/repository/repository_test.go +++ b/pkg/repository/repository_test.go @@ -458,7 +458,7 @@ func TestRepository_UpsertPipelineRun(t *testing.T) { c.Check(got.NumberOfRuns, qt.Equals, 0) c.Check(got.LastRunTime.IsZero(), qt.IsTrue) - minioURL := `http://localhost:19000/instill-ai-vdp/e9ee5c7e-23a4-4910-b3be-afe1d3ca5254.recipe.json?X-Amz-Algorithm=AWS4-HMAC-SHA256\u0026X-Amz-Credential=minioadmin%2F20240816%2Fus-east-1%2Fs3%2Faws4_request\u0026X-Amz-Date=20240816T030849Z\u0026X-Amz-Expires=604800\u0026X-Amz-SignedHeaders=host\u0026X-Amz-Signature=f25a30c82e067b8da32c01a17452977082309c873d4a3bd72767ffe1118d695c` + minioURL := `http://localhost:19000/core-pipeline/e9ee5c7e-23a4-4910-b3be-afe1d3ca5254.recipe.json?X-Amz-Algorithm=AWS4-HMAC-SHA256\u0026X-Amz-Credential=minioadmin%2F20240816%2Fus-east-1%2Fs3%2Faws4_request\u0026X-Amz-Date=20240816T030849Z\u0026X-Amz-Expires=604800\u0026X-Amz-SignedHeaders=host\u0026X-Amz-Signature=f25a30c82e067b8da32c01a17452977082309c873d4a3bd72767ffe1118d695c` minioURL = url.QueryEscape(minioURL) c.Assert(err, qt.IsNil) @@ -513,7 +513,7 @@ func TestRepository_GetPaginatedPipelineRunsWithPermissions(t *testing.T) { c := qt.New(t) ctx := context.Background() - minioURL := `http://localhost:19000/instill-ai-vdp/e9ee5c7e-23a4-4910-b3be-afe1d3ca5254.recipe.json?X-Amz-Algorithm=AWS4-HMAC-SHA256\u0026X-Amz-Credential=minioadmin%2F20240816%2Fus-east-1%2Fs3%2Faws4_request\u0026X-Amz-Date=20240816T030849Z\u0026X-Amz-Expires=604800\u0026X-Amz-SignedHeaders=host\u0026X-Amz-Signature=f25a30c82e067b8da32c01a17452977082309c873d4a3bd72767ffe1118d695c` + minioURL := `http://localhost:19000/core-pipeline/e9ee5c7e-23a4-4910-b3be-afe1d3ca5254.recipe.json?X-Amz-Algorithm=AWS4-HMAC-SHA256\u0026X-Amz-Credential=minioadmin%2F20240816%2Fus-east-1%2Fs3%2Faws4_request\u0026X-Amz-Date=20240816T030849Z\u0026X-Amz-Expires=604800\u0026X-Amz-SignedHeaders=host\u0026X-Amz-Signature=f25a30c82e067b8da32c01a17452977082309c873d4a3bd72767ffe1118d695c` minioURL = url.QueryEscape(minioURL) mockUIDs := make([]uuid.UUID, 4) diff --git a/pkg/service/blobstorage.go b/pkg/service/blobstorage.go deleted file mode 100644 index 6102361c9..000000000 --- a/pkg/service/blobstorage.go +++ /dev/null @@ -1,119 +0,0 @@ -// TODO: -// We should arrange the logic for blob storage in the pipeline-backend. -// Now, we use blob storage in worker and service. The logic are close but not the same. -// We should refactor the logic to make it more compact and easier to maintain for worker and service. -// This will be addressed in ins-7091 - -package service - -import ( - "context" - "encoding/base64" - "fmt" - "mime" - "strings" - "time" - - "github.com/gabriel-vasile/mimetype" - "google.golang.org/grpc/metadata" - - "github.com/instill-ai/pipeline-backend/pkg/recipe" - "github.com/instill-ai/pipeline-backend/pkg/resource" - "github.com/instill-ai/pipeline-backend/pkg/utils" - - artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - miniox "github.com/instill-ai/x/minio" -) - -func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, data string, ns resource.Namespace, expiryRule miniox.ExpiryRule) (string, error) { - mimeType, err := getMimeType(data) - if err != nil { - return "", fmt.Errorf("get mime type: %w", err) - } - artifactClient := s.artifactPublicServiceClient - - vars, err := recipe.GenerateSystemVariables(ctx, recipe.SystemVariables{}) - - if err != nil { - return "", fmt.Errorf("generate system variables: %w", err) - } - - ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(vars)) - - timestamp := time.Now().Format(time.RFC3339) - objectName := fmt.Sprintf("%s/%s%s", ns.NsUID.String(), timestamp, getFileExtension(mimeType)) - - resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ - NamespaceId: ns.NsID, - ObjectName: objectName, - ObjectExpireDays: int32(expiryRule.ExpirationDays), - }) - - if err != nil { - return "", fmt.Errorf("get upload url: %w", err) - } - - uploadURL := resp.GetUploadUrl() - data = removePrefix(data) - b, err := base64.StdEncoding.DecodeString(data) - if err != nil { - return "", fmt.Errorf("decode base64 string: %w", err) - } - - err = utils.UploadBlobData(ctx, uploadURL, mimeType, b, s.log) - if err != nil { - return "", fmt.Errorf("upload blob data: %w", err) - } - - respDownloadURL, err := artifactClient.GetObjectDownloadURL(ctx, &artifactpb.GetObjectDownloadURLRequest{ - NamespaceId: ns.NsID, - ObjectUid: resp.GetObject().GetUid(), - }) - if err != nil { - return "", fmt.Errorf("get object download url: %w", err) - } - - return respDownloadURL.GetDownloadUrl(), nil -} - -func getMimeType(data string) (string, error) { - var mimeType string - if strings.HasPrefix(data, "data:") { - contentType := strings.TrimPrefix(data, "data:") - parts := strings.SplitN(contentType, ";", 2) - if len(parts) == 0 { - return "", fmt.Errorf("invalid data url") - } - mimeType = parts[0] - } else { - b, err := base64.StdEncoding.DecodeString(data) - if err != nil { - return "", fmt.Errorf("decode base64 string: %w", err) - } - mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0] - - } - return mimeType, nil -} - -func getFileExtension(mimeType string) string { - ext, err := mime.ExtensionsByType(mimeType) - if err != nil { - return "" - } - if len(ext) == 0 { - return "" - } - return ext[0] -} - -func removePrefix(data string) string { - if strings.HasPrefix(data, "data:") { - parts := strings.SplitN(data, ",", 2) - if len(parts) == 0 { - return "" - } - return parts[1] - } - return data -} diff --git a/pkg/service/main.go b/pkg/service/main.go index 87a22c743..e2e0999ee 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -13,17 +13,17 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/instill-ai/pipeline-backend/pkg/acl" - "github.com/instill-ai/pipeline-backend/pkg/external" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/resource" - "github.com/instill-ai/x/minio" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" logx "github.com/instill-ai/x/log" + miniox "github.com/instill-ai/x/minio" ) // Service interface @@ -107,11 +107,11 @@ type service struct { mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient aclClient acl.ACLClientInterface converter Converter - minioClient minio.Client + minioClient miniox.Client memory *memory.Store log *zap.Logger retentionHandler MetadataRetentionHandler - binaryFetcher external.BinaryFetcher + binaryFetcher binary.Fetcher artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient } @@ -125,11 +125,11 @@ func NewService( converter Converter, mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient, mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient, - minioClient minio.Client, + minioClient miniox.Client, componentStore *componentstore.Store, memory *memory.Store, retentionHandler MetadataRetentionHandler, - binaryFetcher external.BinaryFetcher, + binaryFetcher binary.Fetcher, artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient, artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient, ) Service { diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index ab18d3dcf..9add2fe1c 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -2,16 +2,19 @@ package service import ( "context" + "encoding/base64" "encoding/json" "errors" "fmt" "maps" + "mime" "slices" "strings" "sync" "time" "cloud.google.com/go/longrunning/autogen/longrunningpb" + "github.com/gabriel-vasile/mimetype" "github.com/gofrs/uuid" "go.einride.tech/aip/filtering" "go.einride.tech/aip/ordering" @@ -885,7 +888,7 @@ func (s *service) preTriggerPipeline( if strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://") { continue } - downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, str, requester, expiryRule) + downloadURL, err := s.uploadDataToMinIO(ctx, str, requester, expiryRule) if err != nil { return fmt.Errorf("upload blob and get download url: %w", err) } @@ -898,7 +901,7 @@ func (s *service) preTriggerPipeline( if strings.HasPrefix(str[idx], "http://") || strings.HasPrefix(str[idx], "https://") { continue } - downloadURL, err := s.uploadBlobAndGetDownloadURL(ctx, str[idx], requester, expiryRule) + downloadURL, err := s.uploadDataToMinIO(ctx, str[idx], requester, expiryRule) if err != nil { return fmt.Errorf("upload blob and get download url: %w", err) } @@ -2340,3 +2343,77 @@ func (s *service) fetchRecipeSnapshot(ctx context.Context, pipelineTriggerID str return recipe, nil } + +// uploadDataToMinIO uploads data directly to MinIO and returns a presigned download URL +func (s *service) uploadDataToMinIO(ctx context.Context, data string, ns resource.Namespace, expiryRule minio.ExpiryRule) (string, error) { + // Generate object name with timestamp + timestamp := time.Now().Format(time.RFC3339) + mimeType, err := getMimeTypeForUpload(data) + if err != nil { + return "", fmt.Errorf("get mime type: %w", err) + } + objectName := fmt.Sprintf("%s/%s%s", ns.NsUID.String(), timestamp, getFileExtensionForUpload(mimeType)) + + // Decode base64 data + decodedData, err := decodeDataURIForUpload(data) + if err != nil { + return "", fmt.Errorf("decoding data: %w", err) + } + + // Upload data directly to MinIO using the client + param := minio.UploadFileBytesParam{ + UserUID: ns.NsUID, + FilePath: objectName, + FileBytes: decodedData, + FileMimeType: mimeType, + ExpiryRuleTag: expiryRule.Tag, + } + + downloadURL, _, err := s.minioClient.UploadFileBytes(ctx, ¶m) + if err != nil { + return "", fmt.Errorf("uploading data to MinIO: %w", err) + } + + return downloadURL, nil +} + +func getMimeTypeForUpload(data string) (string, error) { + var mimeType string + if strings.HasPrefix(data, "data:") { + contentType := strings.TrimPrefix(data, "data:") + parts := strings.SplitN(contentType, ";", 2) + if len(parts) == 0 { + return "", fmt.Errorf("invalid data url") + } + mimeType = parts[0] + } else { + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return "", fmt.Errorf("decode base64 string: %w", err) + } + mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0] + } + return mimeType, nil +} + +func getFileExtensionForUpload(mimeType string) string { + ext, err := mime.ExtensionsByType(mimeType) + if err != nil { + return "" + } + if len(ext) == 0 { + return "" + } + return ext[0] +} + +func decodeDataURIForUpload(data string) ([]byte, error) { + if strings.HasPrefix(data, "data:") { + parts := strings.SplitN(data, ",", 2) + if len(parts) < 2 { + return nil, fmt.Errorf("invalid data URI format") + } + return base64.StdEncoding.DecodeString(parts[1]) + } + return base64.StdEncoding.DecodeString(data) +} diff --git a/pkg/service/pipeline_test.go b/pkg/service/pipeline_test.go index 18827436c..ab4e8d152 100644 --- a/pkg/service/pipeline_test.go +++ b/pkg/service/pipeline_test.go @@ -11,20 +11,19 @@ import ( "github.com/redis/go-redis/v9" "go.temporal.io/sdk/client" - "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/acl" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/datamodel" - "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/mock" "github.com/instill-ai/pipeline-backend/pkg/repository" "github.com/instill-ai/pipeline-backend/pkg/resource" - "github.com/instill-ai/x/minio" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" + miniox "github.com/instill-ai/x/minio" ) func fakeNamespace() resource.Namespace { @@ -70,10 +69,8 @@ func TestService_UpdateNamespacePipelineByID(t *testing.T) { converter := mock.NewConverterMock(mc) mgmtPrivateClient := mock.NewMgmtPrivateServiceClientMock(mc) - compStore := componentstore.Init(componentstore.InitParams{ - Secrets: config.Config.Component.Secrets, - BinaryFetcher: external.NewBinaryFetcher(), - }) + // Create a simple binary fetcher for testing + binaryFetcher := binary.NewFetcher() service := newService( serviceConfig{ @@ -83,8 +80,9 @@ func TestService_UpdateNamespacePipelineByID(t *testing.T) { aCLClient: aclClient, converter: converter, mgmtPrivateServiceClient: mgmtPrivateClient, - componentStore: compStore, + componentStore: nil, memory: memory.NewStore(nil, nil), + binaryFetcher: binaryFetcher, }, ) @@ -129,11 +127,11 @@ type serviceConfig struct { converter Converter mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient - minioClient minio.Client + minioClient miniox.Client componentStore *componentstore.Store memory *memory.Store retentionHandler MetadataRetentionHandler - binaryFetcher external.BinaryFetcher + binaryFetcher binary.Fetcher artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient } diff --git a/pkg/utils/blobstorage.go b/pkg/utils/blobstorage.go deleted file mode 100644 index 14b364d11..000000000 --- a/pkg/utils/blobstorage.go +++ /dev/null @@ -1,227 +0,0 @@ -// TODO: -// We should arrange the logic for blob storage in the pipeline-backend. -// Now, we use blob storage in worker and service. The logic are close but not the same. -// We should refactor the logic to make it more compact and easier to maintain for worker and service. -// This will be addressed in ins-7091 - -package utils - -import ( - "context" - "encoding/base64" - "fmt" - "mime" - "net/url" - "strings" - - "github.com/gabriel-vasile/mimetype" - "github.com/gofrs/uuid" - "go.uber.org/zap" - "google.golang.org/protobuf/types/known/structpb" - - "github.com/instill-ai/pipeline-backend/config" - "github.com/instill-ai/x/blobstorage" - - artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - miniox "github.com/instill-ai/x/minio" -) - -// UploadBlobParams contains the information and dependencies to upload blob -// data owned by a namespace and obtain a download URL. -type UploadBlobParams struct { - NamespaceID string - NamespaceUID uuid.UUID - ExpiryRule miniox.ExpiryRule - Logger *zap.Logger - ArtifactClient *artifactpb.ArtifactPublicServiceClient -} - -// UploadBlobDataAndReplaceWithURLs uploads the unstructured data in the -// structs to minio and replaces the data with the URL. -// Before calling this function, ctx should have been set with the request -// metadata. -func UploadBlobDataAndReplaceWithURLs(ctx context.Context, dataStructs []*structpb.Struct, params UploadBlobParams) ([]*structpb.Struct, error) { - updatedDataStructs := make([]*structpb.Struct, len(dataStructs)) - for i, dataStruct := range dataStructs { - updatedDataStruct, err := uploadBlobDataAndReplaceWithURL(ctx, dataStruct, params) - if err != nil { - // Note: we don't want to fail the whole process if one of the data structs fails to upload. - updatedDataStructs[i] = dataStruct - } else { - updatedDataStructs[i] = updatedDataStruct - } - } - return updatedDataStructs, nil -} - -func uploadBlobDataAndReplaceWithURL(ctx context.Context, dataStruct *structpb.Struct, params UploadBlobParams) (*structpb.Struct, error) { - for key, value := range dataStruct.GetFields() { - updatedValue, err := processValue(ctx, params, value) - if err == nil { - dataStruct.GetFields()[key] = updatedValue - } - } - - return dataStruct, nil -} - -func processValue(ctx context.Context, params UploadBlobParams, value *structpb.Value) (*structpb.Value, error) { - switch v := value.GetKind().(type) { - case *structpb.Value_StringValue: - if isUnstructuredData(v.StringValue) { - downloadURL, err := uploadBlobAndGetDownloadURL(ctx, v.StringValue, params) - if err != nil { - return nil, err - } - return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: downloadURL}}, nil - } - case *structpb.Value_ListValue: - listValue := v.ListValue - updatedListValue, err := processList(ctx, params, listValue) - if err == nil { - return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: updatedListValue}}, nil - } - case *structpb.Value_StructValue: - for _, item := range v.StructValue.GetFields() { - structData := item.GetStructValue() - updatedStructData, err := uploadBlobDataAndReplaceWithURL(ctx, structData, params) - // Note: we don't want to fail the whole process if one of the data structs fails to upload. - if err == nil { - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: updatedStructData}}, nil - } - } - } - - return value, nil -} - -func processList(ctx context.Context, params UploadBlobParams, list *structpb.ListValue) (*structpb.ListValue, error) { - for i, item := range list.Values { - updatedItem, err := processValue(ctx, params, item) - if err == nil { - list.Values[i] = updatedItem - } - } - - return list, nil -} - -func isUnstructuredData(data string) bool { - return strings.HasPrefix(data, "data:") && strings.Contains(data, ";base64,") -} - -func uploadBlobAndGetDownloadURL(ctx context.Context, data string, params UploadBlobParams) (string, error) { - mimeType, err := getMimeType(data) - if err != nil { - return "", fmt.Errorf("get mime type: %w", err) - } - - uid, err := uuid.NewV4() - if err != nil { - return "", fmt.Errorf("generate uuid: %w", err) - } - - objectName := fmt.Sprintf("%s/%s%s", params.NamespaceUID.String(), uid.String(), getFileExtension(mimeType)) - - artifactClient := *params.ArtifactClient - resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ - NamespaceId: params.NamespaceID, - ObjectName: objectName, - ObjectExpireDays: int32(params.ExpiryRule.ExpirationDays), - }) - - if err != nil { - return "", fmt.Errorf("get upload url: %w", err) - } - - uploadURL := resp.GetUploadUrl() - data = removePrefix(data) - b, err := base64.StdEncoding.DecodeString(data) - if err != nil { - return "", fmt.Errorf("decode base64 string: %w", err) - } - - err = UploadBlobData(ctx, uploadURL, mimeType, b, params.Logger) - if err != nil { - return "", fmt.Errorf("upload blob data: %w", err) - } - - respDownloadURL, err := artifactClient.GetObjectDownloadURL(ctx, &artifactpb.GetObjectDownloadURLRequest{ - NamespaceId: params.NamespaceID, - ObjectUid: resp.GetObject().GetUid(), - }) - if err != nil { - return "", fmt.Errorf("get object download url: %w", err) - } - - return respDownloadURL.GetDownloadUrl(), nil -} - -// UploadBlobData uploads the blob data to the given upload URL. -func UploadBlobData(ctx context.Context, uploadURL string, fileContentType string, fileBytes []byte, logger *zap.Logger) error { - if uploadURL == "" { - return fmt.Errorf("empty upload URL provided") - } - - parsedURL, err := url.Parse(uploadURL) - if err != nil { - return fmt.Errorf("parsing upload URL: %w", err) - } - if config.Config.APIGateway.TLSEnabled { - parsedURL.Scheme = "https" - } else { - parsedURL.Scheme = "http" - } - parsedURL.Host = fmt.Sprintf("%s:%d", config.Config.APIGateway.Host, config.Config.APIGateway.PublicPort) - fullURL := parsedURL.String() - - err = blobstorage.UploadFile(ctx, logger, fullURL, fileBytes, fileContentType) - - if err != nil { - return fmt.Errorf("uploading blob: %w", err) - } - - return nil -} - -func getMimeType(data string) (string, error) { - var mimeType string - if strings.HasPrefix(data, "data:") { - contentType := strings.TrimPrefix(data, "data:") - parts := strings.SplitN(contentType, ";", 2) - if len(parts) == 0 { - return "", fmt.Errorf("invalid data url") - } - mimeType = parts[0] - } else { - b, err := base64.StdEncoding.DecodeString(data) - if err != nil { - return "", fmt.Errorf("decode base64 string: %w", err) - } - mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0] - - } - return mimeType, nil -} - -func getFileExtension(mimeType string) string { - ext, err := mime.ExtensionsByType(mimeType) - if err != nil { - return "" - } - if len(ext) == 0 { - return "" - } - return ext[0] -} - -func removePrefix(data string) string { - if strings.HasPrefix(data, "data:") { - parts := strings.SplitN(data, ",", 2) - if len(parts) == 0 { - return "" - } - return parts[1] - } - return data -} diff --git a/pkg/worker/blobstorage.go b/pkg/worker/blobstorage.go deleted file mode 100644 index fc674f1d0..000000000 --- a/pkg/worker/blobstorage.go +++ /dev/null @@ -1,95 +0,0 @@ -// TODO: -// We should arrange the logic for blob storage in the pipeline-backend. -// Now, we use blob storage in worker and service. The logic are close but not the same. -// We should refactor the logic to make it more compact and easier to maintain for worker and service. -// This will be addressed in ins-7091 - -package worker - -import ( - "context" - "fmt" - - "go.uber.org/zap" - "google.golang.org/grpc/metadata" - - "github.com/instill-ai/pipeline-backend/pkg/data" - "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/utils" - - artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - logx "github.com/instill-ai/x/log" -) - -func (w *worker) uploadFileAndReplaceWithURL(ctx context.Context, param *ComponentActivityParam, value *format.Value) format.Value { - logger, _ := logx.GetZapLogger(ctx) - if value == nil { - return nil - } - switch v := (*value).(type) { - case format.File: - downloadURL, err := w.uploadBlobDataAndGetDownloadURL(ctx, param, v) - if err != nil || downloadURL == "" { - logger.Warn("uploading blob data", zap.Error(err)) - return v - } - return data.NewString(downloadURL) - case data.Array: - newArray := make(data.Array, len(v)) - for i, item := range v { - newArray[i] = w.uploadFileAndReplaceWithURL(ctx, param, &item) - } - return newArray - case data.Map: - newMap := make(data.Map) - for k, v := range v { - newMap[k] = w.uploadFileAndReplaceWithURL(ctx, param, &v) - } - return newMap - default: - return v - } -} - -func (w *worker) uploadBlobDataAndGetDownloadURL(ctx context.Context, param *ComponentActivityParam, value format.File) (string, error) { - artifactClient := w.artifactPublicServiceClient - requesterID := param.SystemVariables.PipelineRequesterID - - sysVarJSON := utils.StructToMap(param.SystemVariables, "json") - - ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - - objectName := fmt.Sprintf("%s/%s", param.SystemVariables.PipelineRequesterUID.String(), value.Filename()) - - resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{ - NamespaceId: requesterID, - ObjectName: objectName, - ObjectExpireDays: int32(param.SystemVariables.ExpiryRule.ExpirationDays), - }) - - if err != nil { - return "", fmt.Errorf("get upload url: %w", err) - } - - uploadURL := resp.GetUploadUrl() - - fileBytes, err := value.Binary() - if err != nil { - return "", fmt.Errorf("getting file bytes: %w", err) - } - - err = utils.UploadBlobData(ctx, uploadURL, value.ContentType().String(), fileBytes.ByteArray(), w.log) - if err != nil { - return "", fmt.Errorf("upload blob data: %w", err) - } - - respDownloadURL, err := artifactClient.GetObjectDownloadURL(ctx, &artifactpb.GetObjectDownloadURLRequest{ - NamespaceId: requesterID, - ObjectUid: resp.GetObject().GetUid(), - }) - if err != nil { - return "", fmt.Errorf("get object download url: %w", err) - } - - return respDownloadURL.GetDownloadUrl(), nil -} diff --git a/pkg/worker/io.go b/pkg/worker/io.go index 9e8c8d074..889c9b72f 100644 --- a/pkg/worker/io.go +++ b/pkg/worker/io.go @@ -4,11 +4,12 @@ import ( "context" "fmt" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" + "google.golang.org/protobuf/types/known/structpb" "github.com/instill-ai/pipeline-backend/pkg/data" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/recipe" @@ -52,10 +53,10 @@ type inputReader struct { workflowID string compID string originalIdx int - binaryFetcher external.BinaryFetcher + binaryFetcher binary.Fetcher } -func newInputReader(memoryStore *memory.Store, workflowID string, compID string, originalIdx int, binaryFetcher external.BinaryFetcher) *inputReader { +func newInputReader(memoryStore *memory.Store, workflowID string, compID string, originalIdx int, binaryFetcher binary.Fetcher) *inputReader { return &inputReader{ memoryStore: memoryStore, workflowID: workflowID, diff --git a/pkg/worker/main.go b/pkg/worker/main.go index 772198fc5..90c939742 100644 --- a/pkg/worker/main.go +++ b/pkg/worker/main.go @@ -3,6 +3,8 @@ package worker import ( "context" + "github.com/instill-ai/pipeline-backend/pkg/data/binary" + "github.com/gofrs/uuid" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/redis/go-redis/v9" @@ -10,16 +12,15 @@ import ( "go.uber.org/zap" "github.com/instill-ai/pipeline-backend/pkg/component/generic/scheduler/v0" - "github.com/instill-ai/pipeline-backend/pkg/external" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/recipe" "github.com/instill-ai/pipeline-backend/pkg/repository" - "github.com/instill-ai/x/minio" componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" logx "github.com/instill-ai/x/log" + miniox "github.com/instill-ai/x/minio" ) // TaskQueue is the Temporal task queue name for pipeline-backend @@ -61,11 +62,11 @@ type WorkerConfig struct { RedisClient *redis.Client InfluxDBWriteClient api.WriteAPI Component *componentstore.Store - MinioClient minio.Client + MinioClient miniox.Client MemoryStore *memory.Store ArtifactPublicServiceClient artifactpb.ArtifactPublicServiceClient ArtifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient - BinaryFetcher external.BinaryFetcher + BinaryFetcher binary.Fetcher PipelinePublicServiceClient pipelinepb.PipelinePublicServiceClient } @@ -75,13 +76,13 @@ type worker struct { redisClient *redis.Client influxDBWriteClient api.WriteAPI component *componentstore.Store - minioClient minio.Client + minioClient miniox.Client log *zap.Logger memoryStore *memory.Store artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient pipelinePublicServiceClient pipelinepb.PipelinePublicServiceClient - binaryFetcher external.BinaryFetcher + binaryFetcher binary.Fetcher } // NewWorker initiates a temporal worker for workflow and activity definition diff --git a/pkg/worker/minioactivity.go b/pkg/worker/minioactivity.go index f7feec55e..9569805ff 100644 --- a/pkg/worker/minioactivity.go +++ b/pkg/worker/minioactivity.go @@ -2,21 +2,24 @@ package worker import ( "context" + "encoding/base64" "encoding/json" "fmt" + "mime" + "strings" "time" + "github.com/gabriel-vasile/mimetype" + "github.com/gofrs/uuid" "go.uber.org/zap" - "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/structpb" "gopkg.in/guregu/null.v4" - "github.com/gofrs/uuid" "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/memory" - "github.com/instill-ai/pipeline-backend/pkg/utils" - "github.com/instill-ai/x/constant" - "github.com/instill-ai/x/minio" + + constantx "github.com/instill-ai/x/constant" + miniox "github.com/instill-ai/x/minio" ) // UploadRecipeToMinIOParam contains the information to upload a pipeline @@ -44,11 +47,11 @@ func (w *worker) UploadRecipeToMinIOActivity(ctx context.Context, param UploadRe url, minioObjectInfo, err := w.minioClient.WithLogger(log).UploadFileBytes( ctx, - &minio.UploadFileBytesParam{ + &miniox.UploadFileBytesParam{ UserUID: param.Metadata.UserUID, FilePath: fmt.Sprintf("pipeline-runs/recipe/%s.json", param.Metadata.PipelineTriggerID), FileBytes: b, - FileMimeType: constant.ContentTypeJSON, + FileMimeType: constantx.ContentTypeJSON, ExpiryRuleTag: param.Metadata.ExpiryRuleTag, }, ) @@ -64,11 +67,11 @@ func (w *worker) UploadRecipeToMinIOActivity(ctx context.Context, param UploadRe URL: url, }}}) if err != nil { - log.Error("failed to log pipeline run with recipe snapshot", zap.Error(err)) + log.Error("failed to save pipeline run recipe snapshot", zap.Error(err)) return err } - log.Info("UploadRecipeToMinIOActivity finished") + log.Info("UploadRecipeToMinIOActivity completed") return nil } @@ -110,11 +113,11 @@ func (w *worker) UploadOutputsToMinIOActivity(ctx context.Context, param *MinIOU url, objectInfo, err := w.minioClient.WithLogger(log).UploadFile( ctx, - &minio.UploadFileParam{ + &miniox.UploadFileParam{ UserUID: param.UserUID, FilePath: objectName, FileContent: outputStructs, - FileMimeType: constant.ContentTypeJSON, + FileMimeType: constantx.ContentTypeJSON, ExpiryRuleTag: param.ExpiryRuleTag, }, ) @@ -165,19 +168,8 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo compInputs[i] = varStr.GetStructValue() } - sysVarJSON := utils.StructToMap(param.SystemVariables, "json") - - ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - - paramsForUpload := utils.UploadBlobParams{ - NamespaceID: param.SystemVariables.PipelineRequesterID, - NamespaceUID: param.SystemVariables.PipelineRequesterUID, - ExpiryRule: param.SystemVariables.ExpiryRule, - Logger: log, - ArtifactClient: &w.artifactPublicServiceClient, - } - - compInputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, compInputs, paramsForUpload) + // Process unstructured data in the inputs and upload to MinIO + compInputs, err = w.processAndUploadUnstructuredData(ctx, compInputs, param) if err != nil { return err } @@ -186,11 +178,11 @@ func (w *worker) UploadComponentInputsActivity(ctx context.Context, param *Compo url, objectInfo, err := w.minioClient.WithLogger(log).UploadFile( ctx, - &minio.UploadFileParam{ + &miniox.UploadFileParam{ UserUID: param.SystemVariables.PipelineUserUID, FilePath: objectName, FileContent: compInputs, - FileMimeType: constant.ContentTypeJSON, + FileMimeType: constantx.ContentTypeJSON, ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }, ) @@ -249,29 +241,19 @@ func (w *worker) UploadComponentOutputsActivity(ctx context.Context, param *Comp compOutputs[i] = varStr.GetStructValue() } - sysVarJSON := utils.StructToMap(param.SystemVariables, "json") - ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(sysVarJSON)) - - paramsForUpload := utils.UploadBlobParams{ - NamespaceID: param.SystemVariables.PipelineRequesterID, - NamespaceUID: param.SystemVariables.PipelineRequesterUID, - ExpiryRule: param.SystemVariables.ExpiryRule, - Logger: log, - ArtifactClient: &w.artifactPublicServiceClient, - } - - compOutputs, err = utils.UploadBlobDataAndReplaceWithURLs(ctx, compOutputs, paramsForUpload) + // Process unstructured data in the outputs and upload to MinIO + compOutputs, err = w.processAndUploadUnstructuredData(ctx, compOutputs, param) if err != nil { return err } url, objectInfo, err := w.minioClient.WithLogger(log).UploadFile( ctx, - &minio.UploadFileParam{ + &miniox.UploadFileParam{ UserUID: param.SystemVariables.PipelineUserUID, FilePath: objectName, FileContent: compOutputs, - FileMimeType: constant.ContentTypeJSON, + FileMimeType: constantx.ContentTypeJSON, ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, }, ) @@ -295,3 +277,103 @@ func (w *worker) UploadComponentOutputsActivity(ctx context.Context, param *Comp return nil } + +// processAndUploadUnstructuredData processes unstructured data in structs and uploads to MinIO +func (w *worker) processAndUploadUnstructuredData(ctx context.Context, dataStructs []*structpb.Struct, param *ComponentActivityParam) ([]*structpb.Struct, error) { + updatedDataStructs := make([]*structpb.Struct, len(dataStructs)) + for i, dataStruct := range dataStructs { + updatedDataStruct, err := processStructUnstructuredData(ctx, dataStruct, w.uploadUnstructuredDataToMinIO, param) + if err != nil { + // Note: we don't want to fail the whole process if one of the data structs fails to upload. + updatedDataStructs[i] = dataStruct + } else { + updatedDataStructs[i] = updatedDataStruct + } + } + return updatedDataStructs, nil +} + +// uploadUnstructuredDataToMinIO uploads unstructured data to MinIO and returns a download URL +func (w *worker) uploadUnstructuredDataToMinIO(ctx context.Context, data string, param *ComponentActivityParam) (string, error) { + // Generate unique object name + uid, err := uuid.NewV4() + if err != nil { + return "", fmt.Errorf("generate uuid: %w", err) + } + + // Get MIME type + mimeType, err := getMimeType(data) + if err != nil { + return "", fmt.Errorf("get mime type: %w", err) + } + + // Remove prefix and decode base64 data + base64Data := removePrefix(data) + decodedData, err := base64.StdEncoding.DecodeString(base64Data) + if err != nil { + return "", fmt.Errorf("decode base64 data: %w", err) + } + + // Generate object name with extension + objectName := fmt.Sprintf("%s/%s%s", param.SystemVariables.PipelineRequesterUID.String(), uid.String(), getFileExtension(mimeType)) + + // Upload to MinIO + uploadParam := miniox.UploadFileBytesParam{ + UserUID: param.SystemVariables.PipelineRequesterUID, + FilePath: objectName, + FileBytes: decodedData, + FileMimeType: mimeType, + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, + } + + downloadURL, _, err := w.minioClient.UploadFileBytes(ctx, &uploadParam) + if err != nil { + return "", fmt.Errorf("upload to MinIO: %w", err) + } + + return downloadURL, nil +} + +// getMimeType extracts or detects the MIME type from data +func getMimeType(data string) (string, error) { + var mimeType string + if strings.HasPrefix(data, "data:") { + contentType := strings.TrimPrefix(data, "data:") + parts := strings.SplitN(contentType, ";", 2) + if len(parts) == 0 { + return "", fmt.Errorf("invalid data url") + } + mimeType = parts[0] + } else { + b, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return "", fmt.Errorf("decode base64 string: %w", err) + } + mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0] + } + return mimeType, nil +} + +// getFileExtension maps MIME types to file extensions +func getFileExtension(mimeType string) string { + ext, err := mime.ExtensionsByType(mimeType) + if err != nil { + return "" + } + if len(ext) == 0 { + return "" + } + return ext[0] +} + +// removePrefix removes the data URI prefix and returns the base64 data +func removePrefix(data string) string { + if strings.HasPrefix(data, "data:") { + parts := strings.SplitN(data, ",", 2) + if len(parts) == 0 { + return "" + } + return parts[1] + } + return data +} diff --git a/pkg/worker/utils.go b/pkg/worker/utils.go index bdfbe1f20..5f5f08fe9 100644 --- a/pkg/worker/utils.go +++ b/pkg/worker/utils.go @@ -2,32 +2,14 @@ package worker import ( "context" - "encoding/json" "fmt" "strings" - "github.com/instill-ai/pipeline-backend/config" "github.com/instill-ai/pipeline-backend/pkg/data" "github.com/instill-ai/pipeline-backend/pkg/data/format" - "github.com/instill-ai/pipeline-backend/pkg/utils" + "google.golang.org/protobuf/types/known/structpb" ) -func (w *worker) writeNewDataPoint(ctx context.Context, data utils.PipelineUsageMetricData) error { - if config.Config.Server.Usage.Enabled { - bData, err := json.Marshal(data) - if err != nil { - return err - } - - w.redisClient.RPush(ctx, fmt.Sprintf("user:%s:pipeline.trigger_data", data.OwnerUID), string(bData)) - } - - w.influxDBWriteClient.WritePoint(utils.NewPipelineDataPoint(data)) - w.influxDBWriteClient.WritePoint(utils.DeprecatedNewPipelineDatapoint(data)) - - return nil -} - const ( rangeStart = "start" rangeStop = "stop" @@ -83,3 +65,63 @@ func setIteratorIndex(v format.Value, identifier string, index int) format.Value return v } } + +// isUnstructuredData checks if a string contains unstructured data (data URI format) +func isUnstructuredData(data string) bool { + return strings.HasPrefix(data, "data:") && strings.Contains(data, ";base64,") +} + +// processStructUnstructuredData processes unstructured data in a struct +func processStructUnstructuredData(ctx context.Context, dataStruct *structpb.Struct, uploadFn uploadFunc, param *ComponentActivityParam) (*structpb.Struct, error) { + for key, value := range dataStruct.GetFields() { + updatedValue, err := processValueUnstructuredData(ctx, value, uploadFn, param) + if err == nil { + dataStruct.GetFields()[key] = updatedValue + } + } + return dataStruct, nil +} + +// uploadFunc is a function type for uploading unstructured data +type uploadFunc func(ctx context.Context, data string, param *ComponentActivityParam) (string, error) + +// processValueUnstructuredData processes unstructured data in a value +func processValueUnstructuredData(ctx context.Context, value *structpb.Value, uploadFn uploadFunc, param *ComponentActivityParam) (*structpb.Value, error) { + switch v := value.GetKind().(type) { + case *structpb.Value_StringValue: + if isUnstructuredData(v.StringValue) { + downloadURL, err := uploadFn(ctx, v.StringValue, param) + if err != nil { + return nil, err + } + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: downloadURL}}, nil + } + case *structpb.Value_ListValue: + listValue := v.ListValue + updatedListValue, err := processListUnstructuredData(ctx, listValue, uploadFn, param) + if err == nil { + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: updatedListValue}}, nil + } + case *structpb.Value_StructValue: + for _, item := range v.StructValue.GetFields() { + structData := item.GetStructValue() + updatedStructData, err := processStructUnstructuredData(ctx, structData, uploadFn, param) + // Note: we don't want to fail the whole process if one of the data structs fails to upload. + if err == nil { + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: updatedStructData}}, nil + } + } + } + return value, nil +} + +// processListUnstructuredData processes unstructured data in a list +func processListUnstructuredData(ctx context.Context, list *structpb.ListValue, uploadFn uploadFunc, param *ComponentActivityParam) (*structpb.ListValue, error) { + for i, item := range list.Values { + updatedItem, err := processValueUnstructuredData(ctx, item, uploadFn, param) + if err == nil { + list.Values[i] = updatedItem + } + } + return list, nil +} diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index b7dad2aca..9de0bd17a 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -37,6 +37,7 @@ import ( pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta" errorsx "github.com/instill-ai/x/errors" logx "github.com/instill-ai/x/log" + miniox "github.com/instill-ai/x/minio" ) // TriggerPipelineWorkflowParam contains the parameters for TriggerPipelineWorkflow @@ -1680,3 +1681,79 @@ func (w *worker) PurgeWorkflowMemoryActivity(_ context.Context, workflowID strin func (w *worker) CleanupWorkflowMemoryActivity(ctx context.Context, userUID uuid.UUID, workflowID string) error { return w.memoryStore.CleanupWorkflowMemory(ctx, userUID, workflowID) } + +// writeNewDataPoint writes a new data point to influxdb and redis +func (w *worker) writeNewDataPoint(ctx context.Context, data utils.PipelineUsageMetricData) error { + if config.Config.Server.Usage.Enabled { + bData, err := json.Marshal(data) + if err != nil { + return err + } + + w.redisClient.RPush(ctx, fmt.Sprintf("user:%s:pipeline.trigger_data", data.OwnerUID), string(bData)) + } + + w.influxDBWriteClient.WritePoint(utils.NewPipelineDataPoint(data)) + w.influxDBWriteClient.WritePoint(utils.DeprecatedNewPipelineDatapoint(data)) + + return nil +} + +// uploadFileAndReplaceWithURL uploads file objects and replaces them with URLs +func (w *worker) uploadFileAndReplaceWithURL(ctx context.Context, param *ComponentActivityParam, value *format.Value) format.Value { + logger, _ := logx.GetZapLogger(ctx) + if value == nil { + return nil + } + switch v := (*value).(type) { + case format.File: + downloadURL, err := w.uploadFileToMinIO(ctx, param, v) + if err != nil || downloadURL == "" { + logger.Warn("uploading blob data", zap.Error(err)) + return v + } + return data.NewString(downloadURL) + case data.Array: + newArray := make(data.Array, len(v)) + for i, item := range v { + newArray[i] = w.uploadFileAndReplaceWithURL(ctx, param, &item) + } + return newArray + case data.Map: + newMap := make(data.Map) + for k, v := range v { + newMap[k] = w.uploadFileAndReplaceWithURL(ctx, param, &v) + } + return newMap + default: + return v + } +} + +// uploadFileToMinIO uploads a file directly to MinIO and returns a presigned download URL +func (w *worker) uploadFileToMinIO(ctx context.Context, param *ComponentActivityParam, value format.File) (string, error) { + // Get file bytes + fileBytes, err := value.Binary() + if err != nil { + return "", fmt.Errorf("getting file bytes: %w", err) + } + + // Create object name + objectName := fmt.Sprintf("%s/%s", param.SystemVariables.PipelineRequesterUID.String(), value.Filename().String()) + + // Upload file directly to MinIO using the client + uploadParam := miniox.UploadFileBytesParam{ + UserUID: param.SystemVariables.PipelineRequesterUID, + FilePath: objectName, + FileBytes: fileBytes.ByteArray(), + FileMimeType: value.ContentType().String(), + ExpiryRuleTag: param.SystemVariables.ExpiryRule.Tag, + } + + downloadURL, _, err := w.minioClient.UploadFileBytes(ctx, &uploadParam) + if err != nil { + return "", fmt.Errorf("uploading file to MinIO: %w", err) + } + + return downloadURL, nil +} From a0887ff46a929d208ab4a1f1bc694334fa121d50 Mon Sep 17 00:00:00 2001 From: Ping-Lin Chang Date: Fri, 8 Aug 2025 02:43:57 +0100 Subject: [PATCH 2/2] refactor(minio): address PR review comments --- cmd/main/main.go | 1 - cmd/worker/main.go | 1 - pkg/component/store/store.go | 5 ----- pkg/data/binary/fetcher.go | 16 ++++++---------- pkg/service/main.go | 2 -- pkg/service/pipeline_test.go | 7 ------- pkg/worker/main.go | 2 -- 7 files changed, 6 insertions(+), 28 deletions(-) diff --git a/cmd/main/main.go b/cmd/main/main.go index 3de9be3e2..b80e84c44 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -167,7 +167,6 @@ func main() { compStore, ms, service.NewRetentionHandler(), - compStore.GetBinaryFetcher(), artifactPublicServiceClient, artifactPrivateServiceClient, ) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index ddf7d447f..b90dbd816 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -121,7 +121,6 @@ func main() { MemoryStore: ms, ArtifactPublicServiceClient: artifactPublicServiceClient, ArtifactPrivateServiceClient: artifactPrivateServiceClient, - BinaryFetcher: compStore.GetBinaryFetcher(), PipelinePublicServiceClient: pipelinePublicServiceClient, }, ) diff --git a/pkg/component/store/store.go b/pkg/component/store/store.go index 61ab152a5..bc4e2e526 100644 --- a/pkg/component/store/store.go +++ b/pkg/component/store/store.go @@ -249,11 +249,6 @@ func Init(param InitParams) *Store { return compStore } -// GetBinaryFetcher returns the binary fetcher instance used by the store -func (s *Store) GetBinaryFetcher() binary.Fetcher { - return s.binaryFetcher -} - // Import loads the component definitions into memory. func (s *Store) Import(comp base.IComponent) { c := &component{comp: comp} diff --git a/pkg/data/binary/fetcher.go b/pkg/data/binary/fetcher.go index e229c48fe..e5565ba65 100644 --- a/pkg/data/binary/fetcher.go +++ b/pkg/data/binary/fetcher.go @@ -10,24 +10,20 @@ import ( "github.com/go-resty/resty/v2" ) -// Fetcher is an interface that fetches binary data from a URL. -type Fetcher interface { - FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) -} - -type fetcher struct { +// Fetcher is a struct that fetches binary data from a URL. +type Fetcher struct { httpClient *resty.Client } -// NewFetcher creates a new BinaryFetcher instance. +// NewFetcher creates a new Fetcher instance. func NewFetcher() Fetcher { - return &fetcher{ + return Fetcher{ httpClient: resty.New().SetRetryCount(3), } } // FetchFromURL fetches binary data from a URL. -func (f *fetcher) FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) { +func (f *Fetcher) FetchFromURL(ctx context.Context, url string) (body []byte, contentType string, filename string, err error) { if strings.HasPrefix(url, "data:") { return f.convertDataURIToBytes(url) } @@ -52,7 +48,7 @@ func (f *fetcher) FetchFromURL(ctx context.Context, url string) (body []byte, co return } -func (f *fetcher) convertDataURIToBytes(url string) (b []byte, contentType string, filename string, err error) { +func (f *Fetcher) convertDataURIToBytes(url string) (b []byte, contentType string, filename string, err error) { slices := strings.Split(url, ",") if len(slices) == 1 { b, err = base64.StdEncoding.DecodeString(url) diff --git a/pkg/service/main.go b/pkg/service/main.go index e2e0999ee..095381c7b 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -129,7 +129,6 @@ func NewService( componentStore *componentstore.Store, memory *memory.Store, retentionHandler MetadataRetentionHandler, - binaryFetcher binary.Fetcher, artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient, artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient, ) Service { @@ -148,7 +147,6 @@ func NewService( memory: memory, log: zapLogger, retentionHandler: retentionHandler, - binaryFetcher: binaryFetcher, artifactPublicServiceClient: artifactPublicServiceClient, artifactPrivateServiceClient: artifactPrivateServiceClient, } diff --git a/pkg/service/pipeline_test.go b/pkg/service/pipeline_test.go index ab4e8d152..ba72076ce 100644 --- a/pkg/service/pipeline_test.go +++ b/pkg/service/pipeline_test.go @@ -12,7 +12,6 @@ import ( "go.temporal.io/sdk/client" "github.com/instill-ai/pipeline-backend/pkg/acl" - "github.com/instill-ai/pipeline-backend/pkg/data/binary" "github.com/instill-ai/pipeline-backend/pkg/datamodel" "github.com/instill-ai/pipeline-backend/pkg/memory" "github.com/instill-ai/pipeline-backend/pkg/mock" @@ -69,9 +68,6 @@ func TestService_UpdateNamespacePipelineByID(t *testing.T) { converter := mock.NewConverterMock(mc) mgmtPrivateClient := mock.NewMgmtPrivateServiceClientMock(mc) - // Create a simple binary fetcher for testing - binaryFetcher := binary.NewFetcher() - service := newService( serviceConfig{ repository: repo, @@ -82,7 +78,6 @@ func TestService_UpdateNamespacePipelineByID(t *testing.T) { mgmtPrivateServiceClient: mgmtPrivateClient, componentStore: nil, memory: memory.NewStore(nil, nil), - binaryFetcher: binaryFetcher, }, ) @@ -131,7 +126,6 @@ type serviceConfig struct { componentStore *componentstore.Store memory *memory.Store retentionHandler MetadataRetentionHandler - binaryFetcher binary.Fetcher artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient } @@ -154,7 +148,6 @@ func newService(cfg serviceConfig) Service { cfg.componentStore, cfg.memory, cfg.retentionHandler, - cfg.binaryFetcher, cfg.artifactPublicServiceClient, cfg.artifactPrivateServiceClient, ) diff --git a/pkg/worker/main.go b/pkg/worker/main.go index 90c939742..d3bbc8ab9 100644 --- a/pkg/worker/main.go +++ b/pkg/worker/main.go @@ -66,7 +66,6 @@ type WorkerConfig struct { MemoryStore *memory.Store ArtifactPublicServiceClient artifactpb.ArtifactPublicServiceClient ArtifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient - BinaryFetcher binary.Fetcher PipelinePublicServiceClient pipelinepb.PipelinePublicServiceClient } @@ -100,7 +99,6 @@ func NewWorker( log: logger, artifactPublicServiceClient: workerConfig.ArtifactPublicServiceClient, artifactPrivateServiceClient: workerConfig.ArtifactPrivateServiceClient, - binaryFetcher: workerConfig.BinaryFetcher, pipelinePublicServiceClient: workerConfig.PipelinePublicServiceClient, } }