Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions build/apiserver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ RUN apt update \
&& apt install -y --no-install-recommends ca-certificates openjdk-11-jre-headless \
&& apt clean \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update \
&& apt-get install -y cwltool
ARG TARGETOS
ARG TARGETARCH
USER nobody
USER root
WORKDIR /app
COPY --from=builder --chown=nobody:nogroup /go/src/github.com/Bio-OS/bioos/conf conf
COPY --from=builder --chown=root:root /go/src/github.com/Bio-OS/bioos/conf conf
COPY --from=builder /go/src/github.com/Bio-OS/bioos/_output/platforms/${TARGETOS}/${TARGETARCH}/apiserver .
COPY --from=builder /go/src/github.com/Bio-OS/bioos/womtool.jar .
ENTRYPOINT ["/app/apiserver"]
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type CreateSubmissionCommand struct {
Description *string `validate:"omitempty,submissionDesc"`
Type string `validate:"required,oneof=dataModel filePath"`
Entity *Entity
ExposedOptions ExposedOptions
ExposedOptions string
InOutMaterial *InOutMaterial
}

Expand All @@ -31,10 +31,6 @@ type InOutMaterial struct {
OutputsMaterial string
}

type ExposedOptions struct {
ReadFromCache bool
}

type DeleteSubmissionCommand struct {
WorkspaceID string `validate:"required"`
ID string `validate:"required"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,14 @@ func (c *createSubmissionHandler) Handle(ctx context.Context, cmd *CreateSubmiss
}

param := submission.CreateSubmissionParam{
Name: cmd.Name,
Description: cmd.Description,
WorkflowID: cmd.WorkflowID,
WorkspaceID: cmd.WorkspaceID,
Type: cmd.Type,
ExposedOptions: submission.ExposedOptions{
ReadFromCache: cmd.ExposedOptions.ReadFromCache,
},
Inputs: make(map[string]interface{}),
Outputs: make(map[string]interface{}),
Name: cmd.Name,
Description: cmd.Description,
WorkflowID: cmd.WorkflowID,
WorkspaceID: cmd.WorkspaceID,
Type: cmd.Type,
ExposedOptions: cmd.ExposedOptions,
Inputs: make(map[string]interface{}),
Outputs: make(map[string]interface{}),
}

switch param.Type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type SubmissionItem struct {
WorkflowVersionID string
RunStatus Status
Entity *Entity
ExposedOptions ExposedOptions
ExposedOptions string
InOutMaterial *InOutMaterial
WorkspaceID string
}
Expand All @@ -30,10 +30,6 @@ type InOutMaterial struct {
OutputsMaterial string
}

type ExposedOptions struct {
ReadFromCache bool
}

type WorkflowVersion struct {
ID string
VersionID string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (e *EventHandlerSubmitRun) Handle(ctx context.Context, event *submission.Ev
BioosRunIDKey: run.ID,
},
WorkflowEngineParameters: event.RunConfig.WorkflowEngineParameters,
WorkflowURL: event.RunConfig.MainWorkflowFilePath,
},
WorkflowAttachment: event.RunConfig.WorkflowContents,
})
Expand All @@ -85,7 +86,7 @@ func (e *EventHandlerSubmitRun) markRunFailed(ctx context.Context, run *Run, mes
tempRun := run.Copy()
tempRun.Message = utils.PointString(message)
tempRun.FinishTime = utils.PointTime(time.Now())
if err := e.runRepo.Save(ctx, run); err != nil {
if err := e.runRepo.Save(ctx, tempRun); err != nil {
return apperrors.NewInternalError(err)
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package submission

import (
"context"
"reflect"
"encoding/json"

"github.com/Bio-OS/bioos/internal/context/workspace/infrastructure/eventbus"
workspaceproto "github.com/Bio-OS/bioos/internal/context/workspace/interface/grpc/proto"
apperrors "github.com/Bio-OS/bioos/pkg/errors"
applog "github.com/Bio-OS/bioos/pkg/log"
"github.com/Bio-OS/bioos/pkg/utils"
"github.com/Bio-OS/bioos/pkg/utils/grpc"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func (h *CreateHandler) genCreateRunEvent(ctx context.Context, event *CreateEven
if err != nil {
return nil, apperrors.NewInternalError(err)
}
workflowEngineParameters := exposedOptions2Map(&sub.ExposedOptions)
workflowEngineParameters := exposedOptions2Map(sub.ExposedOptions)
files, err := h.genWorkflowFiles(ctx, getWorkflowVersionResp.Version, event)
if err != nil {
return nil, err
Expand Down Expand Up @@ -99,12 +100,12 @@ func (h *CreateHandler) genWorkflowFiles(ctx context.Context, workflowVersion *w
}

// exposedOptions2Map ...
func exposedOptions2Map(exposedOptions *ExposedOptions) map[string]interface{} {
func exposedOptions2Map(exposedOptions string) map[string]interface{} {
res := make(map[string]interface{})
t := reflect.TypeOf(exposedOptions).Elem()
v := reflect.ValueOf(exposedOptions).Elem()
for i := 0; i < v.NumField(); i++ {
res[t.Field(i).Tag.Get(WESTag)] = v.Field(i).Interface()
err := json.Unmarshal([]byte(exposedOptions), &res)
if err != nil {
applog.Errorf("invalid exposedOptions, value : ", exposedOptions)
res = make(map[string]interface{})
}
return res
}
2 changes: 1 addition & 1 deletion internal/context/submission/domain/submission/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type CreateSubmissionParam struct {
Type string
Inputs map[string]interface{}
Outputs map[string]interface{}
ExposedOptions ExposedOptions
ExposedOptions string
}

func (p CreateSubmissionParam) validate() error {
Expand Down
6 changes: 1 addition & 5 deletions internal/context/submission/domain/submission/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@ type Submission struct {
Type string
Inputs map[string]interface{}
Outputs map[string]interface{}
ExposedOptions ExposedOptions
ExposedOptions string
Status string
StartTime time.Time
FinishTime *time.Time
}

type ExposedOptions struct {
ReadFromCache bool `wes:"read_from_cache"`
}
44 changes: 28 additions & 16 deletions internal/context/submission/infrastructure/client/wes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,7 @@ func (i *impl) RunWorkflow(ctx context.Context, req *RunWorkflowRequest) (*RunWo
return nil, newBadRequestError("workflowAttachment is empty")
}
prefix := longestCommonPrefix(filesPath)
if len(filesPath) == 1 {
prefix = fmt.Sprintf("%s/", path.Dir(filesPath[0]))
}
// fix bug that func longestCommonPrefix() only return string level longestCommonPrefix
// However we need path level longestCommonPrefix.
// | Main File |Attachment File|String Level Prefix|Path Level Prefix|
// |/app/tasks.wdl| /app/test.wdl | /app/t | /app/ |
if !strings.HasSuffix(prefix, "/") {
if !strings.HasSuffix(prefix, "/") && len(prefix) > 0 {
prefix = fmt.Sprintf("%s/", path.Dir(prefix))
}
for _, filePath := range filesPath {
Expand All @@ -128,7 +121,7 @@ func (i *impl) RunWorkflow(ctx context.Context, req *RunWorkflowRequest) (*RunWo
}
newReq = newReq.SetFileReader(workflowAttachment, filePath[len(prefix):], bytes.NewReader(decodeContent))
}
formData, err := runRequest2FormData(&req.RunRequest)
formData, err := runRequest2FormData(&req.RunRequest, prefix)
if err != nil {
return nil, newBadRequestError(err.Error())
}
Expand Down Expand Up @@ -160,6 +153,7 @@ func (i *impl) GetRunLog(ctx context.Context, req *GetRunLogRequest) (*GetRunLog
if res.StatusCode >= 400 {
return nil, res.ErrorResp
}
UpdateLogsWithStdout(&res.GetRunLogResponse)
return &res.GetRunLogResponse, nil
} else if resp.IsError() {
return nil, res.ErrorResp
Expand Down Expand Up @@ -192,7 +186,11 @@ func longestCommonPrefix(strs []string) string {
case 0:
return ""
case 1:
return strs[0]
if strings.Contains(strs[0], "/") {
return fmt.Sprintf("%s/", path.Dir(strs[0]))
} else {
return ""
}
default:
if len(strs[0]) == 0 {
return ""
Expand All @@ -207,11 +205,16 @@ func longestCommonPrefix(strs []string) string {
prefix := strs[0][0:minStrLen]
for {
allFound := true
for i := 1; i < strsLen; i++ {
if strings.Index(strs[i], prefix) != 0 {
prefix = prefix[0 : len(prefix)-1]
allFound = false
break
if !strings.HasSuffix(prefix, "/") {
prefix = prefix[0 : len(prefix)-1]
allFound = false
} else {
for i := 1; i < strsLen; i++ {
if strings.Index(strs[i], prefix) != 0 {
prefix = prefix[0 : len(prefix)-1]
allFound = false
break
}
}
}
if allFound || len(prefix) == 0 {
Expand All @@ -222,7 +225,7 @@ func longestCommonPrefix(strs []string) string {
}

// runRequest2FormData ...
func runRequest2FormData(req *RunRequest) (map[string]string, error) {
func runRequest2FormData(req *RunRequest, prefix string) (map[string]string, error) {
formData := make(map[string]string)
if req.WorkflowParams != nil && len(req.WorkflowParams) > 0 {
workflowParamsInBytes, err := json.Marshal(req.WorkflowParams)
Expand All @@ -247,5 +250,14 @@ func runRequest2FormData(req *RunRequest) (map[string]string, error) {
}
formData[workflowType] = req.WorkflowType
formData[workflowTypeVersion] = req.WorkflowTypeVersion
formData[workflowURL] = req.WorkflowURL[len(prefix):]
return formData, nil
}

// 标准WES API中没有log字段,在此处处理
func UpdateLogsWithStdout(response *GetRunLogResponse) {
response.RunLog.Log = response.RunLog.Stdout
for index := range response.TaskLogs {
response.TaskLogs[index].Log = response.TaskLogs[index].Stdout
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RunRequest struct {
WorkflowTypeVersion string `json:"workflow_type_version"`
Tags map[string]interface{} `json:"tags"`
WorkflowEngineParameters map[string]interface{} `json:"workflow_engine_parameters"`
WorkflowURL string `json:"workflow_url"`
}

// RunStatus ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ func SubmissionPOToSubmissionDTO(ctx context.Context, submission *Submission) (*
WorkflowID: submission.WorkflowID,
WorkflowVersionID: submission.WorkflowVersionID,
WorkspaceID: submission.WorkspaceID,
ExposedOptions: query.ExposedOptions{
ReadFromCache: submission.ExposedOptions.ReadFromCache,
},
ExposedOptions: submission.ExposedOptions,
}
if submission.FinishTime != nil {
item.FinishTime = utils.PointInt64(submission.FinishTime.Unix())
Expand Down Expand Up @@ -93,12 +91,10 @@ func SubmissionPOToSubmissionDO(ctx context.Context, sb *Submission) (*submissio
Type: sb.Type,
Inputs: sb.Inputs,
Outputs: sb.Outputs,
ExposedOptions: submission.ExposedOptions{
ReadFromCache: sb.ExposedOptions.ReadFromCache,
},
Status: sb.Status,
StartTime: sb.StartTime,
FinishTime: sb.FinishTime,
ExposedOptions: sb.ExposedOptions,
Status: sb.Status,
StartTime: sb.StartTime,
FinishTime: sb.FinishTime,
}, nil
}

Expand All @@ -123,11 +119,9 @@ func SubmissionDOToSubmissionPO(ctx context.Context, sb *submission.Submission)
Inputs: sb.Inputs,
Outputs: sb.Outputs,
WorkspaceID: sb.WorkspaceID,
ExposedOptions: ExposedOptions{
ReadFromCache: sb.ExposedOptions.ReadFromCache,
},
Status: sb.Status,
StartTime: sb.StartTime,
FinishTime: sb.FinishTime,
ExposedOptions: sb.ExposedOptions,
Status: sb.Status,
StartTime: sb.StartTime,
FinishTime: sb.FinishTime,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ type Submission struct {
Type string `gorm:"type:varchar(32);not null"`
Inputs map[string]interface{} `gorm:"serializer:json"`
Outputs map[string]interface{} `gorm:"serializer:json"`
ExposedOptions ExposedOptions `gorm:"serializer:json"`
ExposedOptions string `gorm:"type:longtext"`
Status string `gorm:"type:varchar(32);not null"`
StartTime time.Time `gorm:"not null"`
FinishTime *time.Time
UserID *int64
}

type ExposedOptions struct {
ReadFromCache bool `json:"readFromCache"`
}

func (s *SubmissionModel) TableName() string {
return "submission"
}
Expand Down
16 changes: 5 additions & 11 deletions internal/context/submission/interface/grpc/submission_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ func queryEntityDTOToVO(entity *query.Entity) *pb.Entity {
}
}

func queryExposedOptionsDTOToVO(options query.ExposedOptions) *pb.ExposedOptions {
return &pb.ExposedOptions{
ReadFromCache: options.ReadFromCache,
}
func queryExposedOptionsDTOToVO(options string) *pb.ExposedOptions {
return &pb.ExposedOptions{}
}

func queryInOutMaterialDTOToVO(material *query.InOutMaterial) *pb.InOutMaterial {
Expand Down Expand Up @@ -105,13 +103,9 @@ func commandEntityVoToDto(entity *pb.Entity) *command.Entity {
}
}

func commandExposedOptionsVoToDto(options *pb.ExposedOptions) command.ExposedOptions {
if options == nil {
return command.ExposedOptions{}
}
return command.ExposedOptions{
ReadFromCache: options.ReadFromCache,
}
func commandExposedOptionsVoToDto(options *pb.ExposedOptions) string {
return ""

}

func commandInOutMaterialVoToDto(material *pb.InOutMaterial) *command.InOutMaterial {
Expand Down
16 changes: 2 additions & 14 deletions internal/context/submission/interface/hertz/handlers/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func createSubmissionVoToDto(req CreateSubmissionRequest) *submissioncommand.Cre
Description: req.Description,
Type: req.Type,
Entity: commandEntityVoToDto(req.Entity),
ExposedOptions: commandExposedOptionsVoToDto(req.ExposedOptions),
ExposedOptions: req.ExposedOptions,
InOutMaterial: commandInOutMaterialVoToDto(req.InOutMaterial),
}
}
Expand All @@ -33,12 +33,6 @@ func commandEntityVoToDto(entity *Entity) *submissioncommand.Entity {
}
}

func commandExposedOptionsVoToDto(options ExposedOptions) submissioncommand.ExposedOptions {
return submissioncommand.ExposedOptions{
ReadFromCache: options.ReadFromCache,
}
}

func commandInOutMaterialVoToDto(material *InOutMaterial) *submissioncommand.InOutMaterial {
if material == nil {
return nil
Expand Down Expand Up @@ -109,7 +103,7 @@ func submissionItemDtoToVo(item *submissionquery.SubmissionItem) SubmissionItem
WorkflowVersion: queryWorkflowVersionDtoToVo(item.WorkflowID, item.WorkflowVersionID),
RunStatus: submissionQueryStatusDtoToVo(item.RunStatus),
Entity: queryEntityDtoToVo(item.Entity),
ExposedOptions: queryExposedOptionsDtoToVo(item.ExposedOptions),
ExposedOptions: item.ExposedOptions,
InOutMaterial: queryInOutMaterialDtoToVo(item.InOutMaterial),
}
}
Expand Down Expand Up @@ -145,12 +139,6 @@ func queryEntityDtoToVo(entity *submissionquery.Entity) *Entity {
}
}

func queryExposedOptionsDtoToVo(options submissionquery.ExposedOptions) ExposedOptions {
return ExposedOptions{
ReadFromCache: options.ReadFromCache,
}
}

func queryInOutMaterialDtoToVo(material *submissionquery.InOutMaterial) *InOutMaterial {
if material == nil {
return nil
Expand Down
Loading