Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 20 additions & 92 deletions src/pkg/cli/client/byoc/gcp/byoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type ByocGcp struct {
delegateDomainZone string

cdExecution string
cdEtag string
}

func NewByocProvider(ctx context.Context, tenantName types.TenantLabel, stack string) *ByocGcp {
Expand Down Expand Up @@ -313,17 +312,19 @@ func (b *ByocGcp) CdCommand(ctx context.Context, req client.CdCommandRequest) (t
if err := b.SetUpCD(ctx); err != nil {
return "", err
}
etag := types.NewEtag()
cmd := cdCommand{
project: req.Project,
command: []string{string(req.Command)},
etag: etag,
statesUrl: req.StatesUrl,
eventsUrl: req.EventsUrl,
}
cdExecutionId, err := b.runCdCommand(ctx, cmd) // TODO: make domain optional for defang cd
err := b.runCdCommand(ctx, cmd) // TODO: make domain optional for defang cd
if err != nil {
return "", err
}
return cdExecutionId, nil
return etag, nil
}

type cdCommand struct {
Expand All @@ -343,11 +344,11 @@ type CloudBuildStep struct {
Env []string `yaml:"env,omitempty"`
}

func (b *ByocGcp) runCdCommand(ctx context.Context, cmd cdCommand) (string, error) {
func (b *ByocGcp) runCdCommand(ctx context.Context, cmd cdCommand) error {
defangStateUrl := `gs://` + b.bucket
pulumiBackendKey, pulumiBackendValue, err := byoc.GetPulumiBackend(defangStateUrl)
if err != nil {
return "", err
return err
}
env := map[string]string{
"DEFANG_DEBUG": os.Getenv("DEFANG_DEBUG"), // TODO: use the global DoDebug flag
Expand Down Expand Up @@ -399,7 +400,7 @@ func (b *ByocGcp) runCdCommand(ctx context.Context, cmd cdCommand) (string, erro
debugEnv = append(debugEnv, k+"="+v)
}
if err := byoc.DebugPulumiGolang(ctx, debugEnv, cmd.command...); err != nil {
return "", err
return err
}
}

Expand All @@ -416,7 +417,7 @@ func (b *ByocGcp) runCdCommand(ctx context.Context, cmd cdCommand) (string, erro
},
})
if err != nil {
return "", err
return err
}
term.Debugf("Starting CD in cloudbuild at: %v", time.Now().Format(time.RFC3339))
execution, err := b.driver.RunCloudBuild(ctx, gcp.CloudBuildArgs{
Expand All @@ -428,12 +429,11 @@ func (b *ByocGcp) runCdCommand(ctx context.Context, cmd cdCommand) (string, erro
},
})
if err != nil {
return "", err
return err
}
b.cdExecution = execution
// term.Printf("CD Execution: %s\n", execution)

return execution, nil
return nil
}

func (b *ByocGcp) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRequest) (*defangv1.UploadURLResponse, error) {
Expand Down Expand Up @@ -522,15 +522,10 @@ func (b *ByocGcp) deploy(ctx context.Context, req *client.DeployRequest, command
statesUrl: req.StatesUrl,
eventsUrl: req.EventsUrl,
}
execution, err := b.runCdCommand(ctx, cdCmd)
if err != nil {
if err := b.runCdCommand(ctx, cdCmd); err != nil {
return nil, err
}

b.cdEtag = etag
if command == "preview" {
etag = execution // Only wait for the preview command cd job to finish
}
return &defangv1.DeployResponse{Etag: etag, Services: serviceInfos}, nil
}

Expand All @@ -545,27 +540,13 @@ func (b *ByocGcp) Subscribe(ctx context.Context, req *defangv1.SubscribeRequest)
if err != nil {
return nil, err
}
if req.Etag == b.cdEtag {
subscribeStream.AddJobExecutionUpdate(path.Base(b.cdExecution))
}

// TODO: update stack (1st param) to b.PulumiStack
subscribeStream.AddJobStatusUpdate("", req.Project, req.Etag, req.Services)
subscribeStream.AddServiceStatusUpdate("", req.Project, req.Etag, req.Services)
subscribeStream.AddJobStatusUpdate(b.PulumiStack, req.Project, req.Etag, req.Services)
subscribeStream.AddServiceStatusUpdate(b.PulumiStack, req.Project, req.Etag, req.Services)
subscribeStream.StartFollow(time.Now())
return subscribeStream, nil
}

func (b *ByocGcp) QueryLogs(ctx context.Context, req *defangv1.TailRequest) (client.ServerStream[defangv1.TailResponse], error) {
// CdCommand returns the job execution ID as the eTag, which subsequently passed in as eTag in TailRequest,
// in those cases, we need a way to detect when the CD task has finished running and stop tailing thg logs by cancelling the logging context.
if b.cdExecution != "" && req.Etag == b.cdExecution {
var err error
ctx, err = b.getCDExecutionContext(ctx, b.driver, req)
if err != nil {
return nil, err
}
}
return b.getLogStream(ctx, b.driver, req)
}

Expand All @@ -582,17 +563,8 @@ func (b *ByocGcp) getLogStream(ctx context.Context, gcpLogsClient GcpLogsClient,
logStream.AddUntil(req.Until.AsTime())
}
etag := req.Etag
if etag == b.cdExecution { // Do not pass the cd execution name as etag
etag = ""
}
if logs.LogType(req.LogType).Has(logs.LogTypeBuild) {
execName := path.Base(b.cdExecution)
if execName == "." {
execName = ""
}
logStream.AddJobExecutionLog(execName) // CD log when there is an execution name
logStream.AddJobLog(b.PulumiStack, req.Project, etag, req.Services) // Kaniko or CD logs when there is no execution name
logStream.AddCloudBuildLog(b.PulumiStack, req.Project, etag, req.Services) // CloudBuild logs
logStream.AddCloudBuildLog(b.PulumiStack, req.Project, etag, req.Services) // CD logs and CloudBuild logs
}
if logs.LogType(req.LogType).Has(logs.LogTypeRun) {
logStream.AddServiceLog(b.PulumiStack, req.Project, etag, req.Services) // Service logs
Expand All @@ -608,47 +580,6 @@ func (b *ByocGcp) getLogStream(ctx context.Context, gcpLogsClient GcpLogsClient,
return logStream, nil
}

func (b *ByocGcp) getCDExecutionContext(ctx context.Context, gcpLogsClient GcpLogsClient, req *defangv1.TailRequest) (context.Context, error) {
subscribeStream, err := NewSubscribeStream(ctx, gcpLogsClient, true, req.Etag, req.Services)
if err != nil {
return nil, err
}
subscribeStream.AddJobExecutionUpdate(path.Base(b.cdExecution))
var since time.Time
if req.Since.IsValid() {
since = req.Since.AsTime()
}
if req.Follow {
subscribeStream.StartFollow(since)
} else if req.Since.IsValid() {
subscribeStream.StartHead(req.Limit)
} else {
subscribeStream.StartTail(req.Limit)
}

var cancel context.CancelCauseFunc
ctx, cancel = context.WithCancelCause(ctx)
// Note: No defer cancel as cancel will always be called in the goroutine below
go func() {
defer subscribeStream.Close()
for subscribeStream.Receive() {
msg := subscribeStream.Msg()
if msg.State == defangv1.ServiceState_BUILD_FAILED || msg.State == defangv1.ServiceState_DEPLOYMENT_FAILED {
pkg.SleepWithContext(ctx, 3*time.Second) // Make sure the logs are flushed, gcp logs has a longer delay, thus 3s
cancel(fmt.Errorf("CD job failed %s", msg.Status))
return
}
if msg.State == defangv1.ServiceState_DEPLOYMENT_COMPLETED {
pkg.SleepWithContext(ctx, 3*time.Second) // Make sure the logs are flushed, gcp logs has a longer delay, thus 3s
cancel(io.EOF)
return
}
}
cancel(subscribeStream.Err())
}()
return ctx, nil
}

func (b *ByocGcp) GetService(ctx context.Context, req *defangv1.GetRequest) (*defangv1.ServiceInfo, error) {
all, err := b.GetServices(ctx, &defangv1.GetServicesRequest{Project: req.Project})
if err != nil {
Expand Down Expand Up @@ -787,20 +718,17 @@ func (b *ByocGcp) createDeploymentLogQuery(req *defangv1.DebugRequest) string {
until = req.Until.AsTime()
}
query := NewLogQuery(b.driver.GetProjectID())
if b.cdExecution != "" {
query.AddJobExecutionQuery(path.Base(b.cdExecution))
}

query.AddJobLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // Kaniko OR CD logs
query.AddServiceLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // Cloudrun service logs
query.AddCloudBuildLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // CloudBuild logs
query.AddSince(since)
query.AddUntil(until)

query.AddJobStatusUpdateRequestQuery(b.PulumiStack, req.Project, req.Etag, req.Services)
query.AddJobStatusUpdateResponseQuery(b.PulumiStack, req.Project, req.Etag, req.Services)
// Logs
query.AddCloudBuildLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // CloudBuild logs for CD and image builds
query.AddServiceLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // Cloudrun service logs
query.AddComputeEngineLogQuery(b.PulumiStack, req.Project, req.Etag, req.Services) // Compute Engine logs
// Status Updates
query.AddServiceStatusRequestUpdate(b.PulumiStack, req.Project, req.Etag, req.Services)
query.AddServiceStatusReponseUpdate(b.PulumiStack, req.Project, req.Etag, req.Services)
query.AddComputeEngineInstanceGroupInsertOrPatch(b.PulumiStack, req.Project, req.Etag, req.Services)

return query.GetQuery()
}
Expand Down
74 changes: 1 addition & 73 deletions src/pkg/cli/client/byoc/gcp/byoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func TestSetUpCD(t *testing.T) {
command: []string{"up", payload},
}

if op, err := b.runCdCommand(ctx, cmd); err != nil {
if err := b.runCdCommand(ctx, cmd); err != nil {
t.Errorf("CdCommand() error = %v, want nil", err)
} else {
t.Logf("CdCommand() = %v", op)
}
}

Expand Down Expand Up @@ -102,64 +100,6 @@ func (m *MockGcpLoggingTailer) Next(ctx context.Context) (*loggingpb.LogEntry, e
return m.MockGcpLoggingLister.Next()
}

func TestGetCDExecutionContext(t *testing.T) {
tests := []struct {
name string
listEntries []loggingpb.LogEntry
tailEntries []loggingpb.LogEntry
}{
{name: "no entries"},
{name: "with only list entries",
listEntries: []loggingpb.LogEntry{
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 1 from lister"}},
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 2 from lister"}},
},
},
{name: "with only tail entries",
tailEntries: []loggingpb.LogEntry{
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 1 from tailer"}},
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 2 from tailer"}},
},
},
{name: "with both list and tail entries",
listEntries: []loggingpb.LogEntry{
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 1 from lister"}},
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 2 from lister"}},
},
tailEntries: []loggingpb.LogEntry{
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 1 from tailer"}},
{Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "log entry 2 from tailer"}},
},
},
}

ctx := t.Context()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewByocProvider(ctx, "testTenantID", "")

driver := &MockGcpLogsClient{
lister: &MockGcpLoggingLister{},
tailer: &MockGcpLoggingTailer{},
}
newCtx, err := b.getCDExecutionContext(ctx, driver, &defangv1.TailRequest{})
if err != nil {
t.Errorf("getCDExecutionContext() error = %v, want nil", err)
}
if newCtx == ctx {
t.Errorf("getCDExecutionContext() returned same context, want new context")
}
// Wait for subscription done
select {
case <-newCtx.Done():
case <-time.After(10 * time.Second):
t.Errorf("getCDExecutionContext() timeout waiting for done")
}
})
}
}

func TestGetLogStream(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -201,11 +141,6 @@ func TestGetLogStream(t *testing.T) {
Pattern: "error",
LogType: uint32(logs.LogTypeAll),
}},
{name: "with_cd_exec", req: &defangv1.TailRequest{
LogType: uint32(logs.LogTypeAll),
},
cdExecution: "test-execution-id",
},
{name: "with_etag", req: &defangv1.TailRequest{
LogType: uint32(logs.LogTypeAll),
Etag: "test-etag",
Expand All @@ -215,13 +150,6 @@ func TestGetLogStream(t *testing.T) {
LogType: uint32(logs.LogTypeAll),
Etag: "test-etag",
}},
{name: "with_etag_equal_cd_exec", req: &defangv1.TailRequest{
Since: timestamppb.New(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)),
LogType: uint32(logs.LogTypeAll),
Etag: "test-execution-id",
},
cdExecution: "test-execution-id",
},
{name: "with_everything", req: &defangv1.TailRequest{
Project: "test-project",
Pattern: "error",
Expand Down
24 changes: 0 additions & 24 deletions src/pkg/cli/client/byoc/gcp/testdata/with_cd_exec.query

This file was deleted.

4 changes: 0 additions & 4 deletions src/pkg/cli/client/byoc/gcp/testdata/with_etag.query
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ logName="projects/test-project/logs/cos_containers" OR
logName="projects/test-project/logs/docker-logs"
) AND (
(
resource.type = "cloud_run_job"
labels."defang-stack" = "beta"
labels."defang-etag" = "test-etag"
) OR (
resource.type="build"
labels.build_tags =~ "beta__[a-zA-Z0-9-]{1,63}_test-etag"
-labels.build_step="MAIN"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ logName="projects/test-project/logs/cos_containers" OR
logName="projects/test-project/logs/docker-logs"
) AND (timestamp >= "2024-01-01T00:00:00Z") AND (
(
resource.type = "cloud_run_job"
labels."defang-stack" = "beta"
labels."defang-etag" = "test-etag"
) OR (
resource.type="build"
labels.build_tags =~ "beta__[a-zA-Z0-9-]{1,63}_test-etag"
-labels.build_step="MAIN"
Expand Down

This file was deleted.

Loading
Loading