Skip to content

Commit 6fb9be3

Browse files
INTMDB-805: Add support for Pipeline Run (#485)
1 parent fd538db commit 6fb9be3

File tree

2 files changed

+226
-1
lines changed

2 files changed

+226
-1
lines changed

mongodbatlas/data_lakes_pipeline.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"net/http"
2121
)
2222

23-
const dataLakesPipelineBasePath = "api/atlas/v1.0/groups/%s/pipelines"
23+
const (
24+
dataLakesPipelineBasePath = "api/atlas/v1.0/groups/%s/pipelines"
25+
dataLakesPipelineRunPath = dataLakesPipelineBasePath + "/%s/runs"
26+
)
2427

2528
// DataLakePipelineService is an interface for interfacing with the Data Lake Pipeline endpoints of the MongoDB Atlas API.
2629
//
@@ -29,7 +32,9 @@ type DataLakePipelineService interface {
2932
List(context.Context, string) ([]*DataLakePipeline, *Response, error)
3033
ListSnapshots(context.Context, string, string, *ListDataLakePipelineSnapshotOptions) (*DataLakePipelineSnapshotsResponse, *Response, error)
3134
ListIngestionSchedules(context.Context, string, string) ([]*DataLakePipelineIngestionSchedule, *Response, error)
35+
ListRuns(context.Context, string, string) (*DataLakePipelineRunsResponse, *Response, error)
3236
Get(context.Context, string, string) (*DataLakePipeline, *Response, error)
37+
GetRun(context.Context, string, string, string) (*DataLakePipelineRun, *Response, error)
3338
Create(context.Context, string, *DataLakePipeline) (*DataLakePipeline, *Response, error)
3439
Update(context.Context, string, string, *DataLakePipeline) (*DataLakePipeline, *Response, error)
3540
Delete(context.Context, string, string) (*Response, error)
@@ -126,6 +131,34 @@ type DataLakePipelineIngestionSchedule struct {
126131
RetentionValue int32 `json:"retentionValue,omitempty"` // Duration in days, weeks, or months that MongoDB Cloud retains the snapshot.
127132
}
128133

134+
// DataLakePipelineRunsResponse represents the response of DataLakePipelineService.ListRuns.
135+
type DataLakePipelineRunsResponse struct {
136+
Links []*Link `json:"links,omitempty"` // List of one or more Uniform Resource Locators (URLs) that point to API sub-resources, related API resources, or both.
137+
Results []*DataLakePipelineRun `json:"results,omitempty"` // List of returned documents that MongoDB Cloud providers when completing this request.
138+
TotalCount int `json:"totalCount,omitempty"` // Number of documents returned in this response.
139+
}
140+
141+
// DataLakePipelineRun represents a DataLake Pipeline Run.
142+
type DataLakePipelineRun struct {
143+
ID string `json:"_id,omitempty"` // Unique 24-hexadecimal character string that identifies a Data Lake Pipeline run.
144+
BackupFrequencyType string `json:"backupFrequencyType,omitempty"` // Backup schedule interval of the Data Lake Pipeline.
145+
CreatedDate string `json:"createdDate,omitempty"` // Timestamp that indicates when the pipeline run was created.
146+
DatasetName string `json:"datasetName,omitempty"` // Human-readable label that identifies the dataset that Atlas generates during this pipeline run.
147+
GroupID string `json:"groupId,omitempty"` // Unique 24-hexadecimal character string that identifies the project.
148+
LastUpdatedDate string `json:"lastUpdatedDate,omitempty"` // Timestamp that indicates the last time that the pipeline run was updated.
149+
Phase string `json:"phase,omitempty"` // Processing phase of the Data Lake Pipeline.
150+
PipelineID string `json:"pipelineId,omitempty"` // Unique 24-hexadecimal character string that identifies a Data Lake Pipeline.
151+
SnapshotID string `json:"snapshotId,omitempty"` // Unique 24-hexadecimal character string that identifies the snapshot of a cluster.
152+
State string `json:"state,omitempty"` // State of the pipeline run.
153+
Stats *DataLakePipelineRunStats `json:"stats,omitempty"` // Runtime statistics for this Data Lake Pipeline run.
154+
}
155+
156+
// DataLakePipelineRunStats represents runtime statistics for this Data Lake Pipeline run.
157+
type DataLakePipelineRunStats struct {
158+
BytesExported int64 `json:"bytesExported,omitempty"` // Total data size in bytes exported for this pipeline run.
159+
NumDocs int64 `json:"numDocs,omitempty"` // Number of docs ingested for a this pipeline run.
160+
}
161+
129162
// List gets a list of Data Lake Pipelines.
130163
//
131164
// See more: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Data-Lake-Pipelines/operation/listPipelines
@@ -214,6 +247,32 @@ func (s *DataLakePipelineServiceOp) ListIngestionSchedules(ctx context.Context,
214247
return root, resp, nil
215248
}
216249

250+
// ListRuns gets a list of past Data Lake Pipeline runs.
251+
//
252+
// https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Data-Lake-Pipelines/operation/listPipelineRuns
253+
func (s *DataLakePipelineServiceOp) ListRuns(ctx context.Context, groupID, name string) (*DataLakePipelineRunsResponse, *Response, error) {
254+
if groupID == "" {
255+
return nil, nil, NewArgError("groupID", "must be set")
256+
}
257+
if name == "" {
258+
return nil, nil, NewArgError("name", "must be set")
259+
}
260+
261+
path := fmt.Sprintf(dataLakesPipelineRunPath, groupID, name)
262+
req, err := s.Client.NewRequest(ctx, http.MethodGet, path, nil)
263+
if err != nil {
264+
return nil, nil, err
265+
}
266+
267+
root := new(DataLakePipelineRunsResponse)
268+
resp, err := s.Client.Do(ctx, req, root)
269+
if err != nil {
270+
return nil, resp, err
271+
}
272+
273+
return root, resp, err
274+
}
275+
217276
// Get gets the details of one Data Lake Pipeline within the specified project.
218277
//
219278
// See more: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Data-Lake-Pipelines/operation/getPipeline
@@ -242,6 +301,38 @@ func (s *DataLakePipelineServiceOp) Get(ctx context.Context, groupID, name strin
242301
return root, resp, err
243302
}
244303

304+
// GetRun gets the details of one Data Lake Pipeline run within the specified project.
305+
//
306+
// See more: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Data-Lake-Pipelines/operation/getPipelineRun
307+
func (s *DataLakePipelineServiceOp) GetRun(ctx context.Context, groupID, name, id string) (*DataLakePipelineRun, *Response, error) {
308+
if groupID == "" {
309+
return nil, nil, NewArgError("groupID", "must be set")
310+
}
311+
if name == "" {
312+
return nil, nil, NewArgError("name", "must be set")
313+
}
314+
315+
if id == "" {
316+
return nil, nil, NewArgError("id", "must be set")
317+
}
318+
319+
basePath := fmt.Sprintf(dataLakesPipelineRunPath, groupID, name)
320+
path := fmt.Sprintf("%s/%s", basePath, id)
321+
322+
req, err := s.Client.NewRequest(ctx, http.MethodGet, path, nil)
323+
if err != nil {
324+
return nil, nil, err
325+
}
326+
327+
root := new(DataLakePipelineRun)
328+
resp, err := s.Client.Do(ctx, req, root)
329+
if err != nil {
330+
return nil, resp, err
331+
}
332+
333+
return root, resp, err
334+
}
335+
245336
// Create creates one Data Lake Pipeline.
246337
//
247338
// See more: https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Data-Lake-Pipelines/operation/createPipeline

mongodbatlas/data_lakes_pipeline_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,63 @@ func TestDataLakesPipeline_Get(t *testing.T) {
178178
}
179179
}
180180

181+
func TestDataLakesPipeline_GetRun(t *testing.T) {
182+
client, mux, teardown := setup()
183+
defer teardown()
184+
185+
groupID := "6c7498dg87d9e6526801572b"
186+
name := "test"
187+
id := "1"
188+
189+
path := fmt.Sprintf("/api/atlas/v1.0/groups/%s/pipelines/%s/runs/%s", groupID, name, id)
190+
191+
mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
192+
testMethod(t, r, http.MethodGet)
193+
fmt.Fprint(w, `{
194+
"_id": "32b6e34b3d91647abb20e7b8",
195+
"backupFrequencyType": "HOURLY",
196+
"createdDate": "2019-08-24T14:15:22Z",
197+
"datasetName": "test",
198+
"groupId": "32b6e34b3d91647abb20e7b8",
199+
"lastUpdatedDate": "2019-08-24T14:15:22Z",
200+
"phase": "SNAPSHOT",
201+
"pipelineId": "32b6e34b3d91647abb20e7b8",
202+
"snapshotId": "32b6e34b3d91647abb20e7b8",
203+
"state": "PENDING",
204+
"stats": {
205+
"bytesExported": 0,
206+
"numDocs": 0
207+
}
208+
}`)
209+
})
210+
211+
pipeline, _, err := client.DataLakePipeline.GetRun(ctx, groupID, name, id)
212+
if err != nil {
213+
t.Fatalf("DataLakePipeline.GetRun returned error: %v", err)
214+
}
215+
216+
expected := &DataLakePipelineRun{
217+
ID: "32b6e34b3d91647abb20e7b8",
218+
BackupFrequencyType: "HOURLY",
219+
CreatedDate: "2019-08-24T14:15:22Z",
220+
DatasetName: "test",
221+
GroupID: "32b6e34b3d91647abb20e7b8",
222+
LastUpdatedDate: "2019-08-24T14:15:22Z",
223+
Phase: "SNAPSHOT",
224+
PipelineID: "32b6e34b3d91647abb20e7b8",
225+
SnapshotID: "32b6e34b3d91647abb20e7b8",
226+
State: "PENDING",
227+
Stats: &DataLakePipelineRunStats{
228+
BytesExported: 0,
229+
NumDocs: 0,
230+
},
231+
}
232+
233+
if diff := deep.Equal(pipeline, expected); diff != nil {
234+
t.Error(diff)
235+
}
236+
}
237+
181238
func TestDataLakesPipeline_Create(t *testing.T) {
182239
client, mux, teardown := setup()
183240
defer teardown()
@@ -521,3 +578,80 @@ func TestDataLakesPipeline_ListIngestionSchedules(t *testing.T) {
521578
t.Error(diff)
522579
}
523580
}
581+
582+
func TestDataLakesPipeline_ListRuns(t *testing.T) {
583+
client, mux, teardown := setup()
584+
defer teardown()
585+
586+
groupID := "6c7498dg87d9e6526801572b"
587+
name := "test"
588+
589+
path := fmt.Sprintf("/api/atlas/v1.0/groups/%s/pipelines/%s/runs", groupID, name)
590+
591+
mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
592+
testMethod(t, r, http.MethodGet)
593+
fmt.Fprint(w, `{
594+
"links": [
595+
{
596+
"href": "https://cloud.mongodb.com/api/atlas",
597+
"rel": "self"
598+
}
599+
],
600+
"results": [
601+
{
602+
"_id": "32b6e34b3d91647abb20e7b8",
603+
"backupFrequencyType": "HOURLY",
604+
"createdDate": "2019-08-24T14:15:22Z",
605+
"datasetName": "test",
606+
"groupId": "32b6e34b3d91647abb20e7b8",
607+
"lastUpdatedDate": "2019-08-24T14:15:22Z",
608+
"phase": "SNAPSHOT",
609+
"pipelineId": "32b6e34b3d91647abb20e7b8",
610+
"snapshotId": "32b6e34b3d91647abb20e7b8",
611+
"state": "PENDING",
612+
"stats": {
613+
"bytesExported": 0,
614+
"numDocs": 0
615+
}
616+
}
617+
],
618+
"totalCount": 1
619+
}`)
620+
})
621+
622+
pipelines, _, err := client.DataLakePipeline.ListRuns(ctx, groupID, name)
623+
if err != nil {
624+
t.Fatalf("DataLakePipeline.ListRuns returned error: %v", err)
625+
}
626+
627+
expected := &DataLakePipelineRunsResponse{
628+
Links: []*Link{
629+
{
630+
Rel: "self",
631+
Href: "https://cloud.mongodb.com/api/atlas"},
632+
},
633+
Results: []*DataLakePipelineRun{
634+
{
635+
ID: "32b6e34b3d91647abb20e7b8",
636+
BackupFrequencyType: "HOURLY",
637+
CreatedDate: "2019-08-24T14:15:22Z",
638+
DatasetName: "test",
639+
GroupID: "32b6e34b3d91647abb20e7b8",
640+
LastUpdatedDate: "2019-08-24T14:15:22Z",
641+
Phase: "SNAPSHOT",
642+
PipelineID: "32b6e34b3d91647abb20e7b8",
643+
SnapshotID: "32b6e34b3d91647abb20e7b8",
644+
State: "PENDING",
645+
Stats: &DataLakePipelineRunStats{
646+
BytesExported: 0,
647+
NumDocs: 0,
648+
},
649+
},
650+
},
651+
TotalCount: 1,
652+
}
653+
654+
if diff := deep.Equal(pipelines, expected); diff != nil {
655+
t.Error(diff)
656+
}
657+
}

0 commit comments

Comments
 (0)