diff --git a/styx-api-service/src/main/java/com/spotify/styx/api/WorkflowResource.java b/styx-api-service/src/main/java/com/spotify/styx/api/WorkflowResource.java index d5384fb755..1361ba7e73 100644 --- a/styx-api-service/src/main/java/com/spotify/styx/api/WorkflowResource.java +++ b/styx-api-service/src/main/java/com/spotify/styx/api/WorkflowResource.java @@ -106,6 +106,9 @@ public Stream>>> routes( Route.with( json(), "GET", BASE + "///full", rc -> workflowWithState(arg("cid", rc), arg("wfid", rc))), + Route.with( + json(), "GET", BASE + "/full", + rc -> workflowsWithState(rc.request())), Route.with( json(), "GET", BASE, rc -> workflows(rc.request())), @@ -216,6 +219,22 @@ private WorkflowConfiguration readFromJsonWithDefaults(ByteString payload) return WorkflowConfigurationBuilder.from(workflowConfig).deploymentTime(time.get()).build(); } + private Response> workflowsWithState(Request request) { + try { + var paramFilters = Stream.of(QueryParams.values()) + .map(e -> getFilterParams(request, e).map(param -> Map.entry(e, param))) + .flatMap(Optional::stream) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Collection workflows = storage.workflowsWithState().values(); + + return Response.forPayload(workflows); + + } catch (IOException e) { + throw new RuntimeException("Failed to get workflows", e); + } + } + private Response> workflows(Request request) { try { var paramFilters = Stream.of(QueryParams.values()) diff --git a/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java b/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java index 4501ddea3b..0dec513c66 100644 --- a/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java +++ b/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java @@ -40,6 +40,7 @@ import static com.spotify.styx.testdata.TestData.QUERY_THRESHOLD_BEFORE; import static com.spotify.styx.testdata.TestData.TEST_DEPLOYMENT_TIME; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -74,6 +75,7 @@ import com.spotify.styx.model.WorkflowId; import com.spotify.styx.model.WorkflowInstance; import com.spotify.styx.model.WorkflowState; +import com.spotify.styx.model.WorkflowWithState; import com.spotify.styx.state.Trigger; import com.spotify.styx.storage.AggregateStorage; import com.spotify.styx.storage.BigtableMocker; @@ -88,6 +90,7 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -823,6 +826,27 @@ public void shouldReturnWorkflows() throws Exception { assertJson(response, "[*]", hasSize(2)); } + @Test + public void shouldReturnWorkflowsWithState() throws Exception { + sinceVersion(Api.Version.V3); + + Response response = awaitResponse( + serviceHelper.request("GET", path("/full"))); + + var parsedResponse = Arrays.asList(deserialize(response.payload().orElseThrow(), WorkflowWithState[].class)); + var expectedWF1 = WorkflowWithState.create(FLYTE_EXEC_WORKFLOW, WorkflowState.builder().enabled(false).build()); + var expectedWF2 = WorkflowWithState.create(WORKFLOW, WorkflowState.builder().enabled(false).build()); + + assertThat(response, hasStatus(withCode(Status.OK))); + assertJson(response, "[*]", hasSize(2)); + assertThat(parsedResponse, + containsInAnyOrder( + expectedWF1, + expectedWF2 + ) + ); + } + @Test public void shouldReturnFilteredDeploymentTypeWorkflow() throws Exception { sinceVersion(Api.Version.V3); diff --git a/styx-service-common/src/main/java/com/spotify/styx/storage/AggregateStorage.java b/styx-service-common/src/main/java/com/spotify/styx/storage/AggregateStorage.java index 46e46cdea8..12744e649b 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/storage/AggregateStorage.java +++ b/styx-service-common/src/main/java/com/spotify/styx/storage/AggregateStorage.java @@ -41,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.HashMap; import org.apache.hadoop.hbase.client.Connection; /** @@ -196,6 +197,11 @@ public Map workflows() throws IOException { return datastoreStorage.workflows(); } + @Override + public HashMap workflowsWithState() throws IOException { + return datastoreStorage.workflowsWithState(); + } + @Override public Map workflows(Set workflowIds) { return datastoreStorage.workflows(workflowIds); diff --git a/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java b/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java index ede31f58e5..5a10b76707 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java +++ b/styx-service-common/src/main/java/com/spotify/styx/storage/DatastoreStorage.java @@ -641,6 +641,26 @@ Optional workflowWithState(WorkflowId workflowId) throws IOEx return Optional.of(WorkflowWithState.create(workflow, workflowState)); } + HashMap workflowsWithState() throws IOException { + HashMap workflows = new HashMap<>(); + var query = Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build(); + datastore.query(query, entity -> { + Workflow workflow; + WorkflowState workflowState; + WorkflowWithState workflowWithState; + try { + workflow = OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class); + workflowState = workflowState(Optional.of(entity)); + workflowWithState = WorkflowWithState.create(workflow, workflowState); + } catch (IOException e) { + log.warn("Failed to read workflow {}.", entity.getKey(), e); + return; + } + workflows.put(workflow.id(), workflowWithState); + }); + return workflows; + } + private WorkflowState workflowState(Optional workflowEntity) { var builder = WorkflowState.builder(); diff --git a/styx-service-common/src/main/java/com/spotify/styx/storage/Storage.java b/styx-service-common/src/main/java/com/spotify/styx/storage/Storage.java index c1ab7f499b..c0809e63bc 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/storage/Storage.java +++ b/styx-service-common/src/main/java/com/spotify/styx/storage/Storage.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.HashMap; /** * The interface to the persistence layer. @@ -129,6 +130,13 @@ public interface Storage extends Closeable { */ Map workflows() throws IOException; + + /** + * Get all {@link WorkflowWithState}s. + * @return + */ + HashMap workflowsWithState() throws IOException; + /** Get all {@link Workflow}s by doing strongly consistent batch fetch. * * @param workflowIds set of {@link WorkflowId}s diff --git a/styx-service-common/src/test/java/com/spotify/styx/storage/DatastoreStorageTest.java b/styx-service-common/src/test/java/com/spotify/styx/storage/DatastoreStorageTest.java index 79ea5fb38f..acc42dfa6f 100644 --- a/styx-service-common/src/test/java/com/spotify/styx/storage/DatastoreStorageTest.java +++ b/styx-service-common/src/test/java/com/spotify/styx/storage/DatastoreStorageTest.java @@ -84,6 +84,7 @@ import com.google.cloud.datastore.StringValue; import com.google.common.collect.ImmutableSet; import com.spotify.styx.model.Backfill; +import com.spotify.styx.model.WorkflowWithState; import com.spotify.styx.model.BackfillBuilder; import com.spotify.styx.model.ExecutionDescription; import com.spotify.styx.model.Resource; @@ -555,6 +556,39 @@ public void allFieldsAreSetWhenRetrievingWorkflowState() throws Exception { assertThat(retrieved, is(state)); } + @Test + public void shouldReturnWorkflowsWithState() throws Exception { + assertThat(storage.workflowsWithState().isEmpty(), is(true)); + + Workflow workflow1 = workflow(WORKFLOW_ID1); + Workflow workflow2 = workflow(WORKFLOW_ID2); + Workflow workflow3 = workflow(WORKFLOW_ID3); + + storage.store(workflow1); + storage.store(workflow2); + storage.store(workflow3); + + var instant = Instant.parse("2016-03-14T14:00:00Z"); + var state_workflow_id1 = WorkflowState.builder() + .enabled(true) + .nextNaturalTrigger(instant) + .nextNaturalOffsetTrigger(instant.plus(1, ChronoUnit.DAYS)) + .build(); + var state_workflow_id2 = state_workflow_id1.toBuilder().enabled(false).build(); + var state_workflow_id3 = state_workflow_id1.toBuilder().nextNaturalOffsetTrigger(instant.plus(2, ChronoUnit.DAYS)).build(); + + storage.patchState(WORKFLOW_ID1, state_workflow_id1); + storage.patchState(WORKFLOW_ID2, state_workflow_id2); + storage.patchState(WORKFLOW_ID3, state_workflow_id3); + + var workflows = storage.workflowsWithState(); + assertThat(workflows.size(), is(3)); + + assertThat(workflows, hasEntry(WORKFLOW_ID1, WorkflowWithState.create(workflow1, state_workflow_id1))); + assertThat(workflows, hasEntry(WORKFLOW_ID2, WorkflowWithState.create(workflow2, state_workflow_id2))); + assertThat(workflows, hasEntry(WORKFLOW_ID3, WorkflowWithState.create(workflow3, state_workflow_id3))); + } + @Test public void shouldReturnWorkflowWithState() throws Exception { storage.store(WORKFLOW);