Skip to content
This repository was archived by the owner on Jul 12, 2023. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public Stream<Route<AsyncHandler<Response<ByteString>>>> routes(
Route.with(
json(), "GET", BASE + "/<cid>/<wfid>/full",
rc -> workflowWithState(arg("cid", rc), arg("wfid", rc))),
Route.with(
json(), "GET", BASE + "/full",
rc -> workflowsWithState(rc.request())),
Route.with(
json(), "GET", BASE,
rc -> workflows(rc.request())),
Expand Down Expand Up @@ -216,6 +219,22 @@ private WorkflowConfiguration readFromJsonWithDefaults(ByteString payload)
return WorkflowConfigurationBuilder.from(workflowConfig).deploymentTime(time.get()).build();
}

private Response<Collection<WorkflowWithState>> workflowsWithState(Request request) {
try {
var paramFilters = Stream.of(QueryParams.values())
.map(e -> getFilterParams(request, e).map(param -> Map.entry(e, param)))
.flatMap(Optional::stream)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Collection<WorkflowWithState> workflows = storage.workflowsWithState().values();

return Response.forPayload(workflows);

} catch (IOException e) {
throw new RuntimeException("Failed to get workflows", e);
}
}

private Response<Collection<Workflow>> workflows(Request request) {
try {
var paramFilters = Stream.of(QueryParams.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.spotify.styx.testdata.TestData.QUERY_THRESHOLD_BEFORE;
import static com.spotify.styx.testdata.TestData.TEST_DEPLOYMENT_TIME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -74,6 +75,7 @@
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.model.WorkflowState;
import com.spotify.styx.model.WorkflowWithState;
import com.spotify.styx.state.Trigger;
import com.spotify.styx.storage.AggregateStorage;
import com.spotify.styx.storage.BigtableMocker;
Expand All @@ -88,6 +90,7 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -823,6 +826,27 @@ public void shouldReturnWorkflows() throws Exception {
assertJson(response, "[*]", hasSize(2));
}

@Test
public void shouldReturnWorkflowsWithState() throws Exception {
sinceVersion(Api.Version.V3);

Response<ByteString> response = awaitResponse(
serviceHelper.request("GET", path("/full")));

var parsedResponse = Arrays.asList(deserialize(response.payload().orElseThrow(), WorkflowWithState[].class));
var expectedWF1 = WorkflowWithState.create(FLYTE_EXEC_WORKFLOW, WorkflowState.builder().enabled(false).build());
var expectedWF2 = WorkflowWithState.create(WORKFLOW, WorkflowState.builder().enabled(false).build());

assertThat(response, hasStatus(withCode(Status.OK)));
assertJson(response, "[*]", hasSize(2));
assertThat(parsedResponse,
containsInAnyOrder(
expectedWF1,
expectedWF2
)
);
}

@Test
public void shouldReturnFilteredDeploymentTypeWorkflow() throws Exception {
sinceVersion(Api.Version.V3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;
import org.apache.hadoop.hbase.client.Connection;

/**
Expand Down Expand Up @@ -196,6 +197,11 @@ public Map<WorkflowId, Workflow> workflows() throws IOException {
return datastoreStorage.workflows();
}

@Override
public HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
return datastoreStorage.workflowsWithState();
}

@Override
public Map<WorkflowId, Workflow> workflows(Set<WorkflowId> workflowIds) {
return datastoreStorage.workflows(workflowIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,26 @@ Optional<WorkflowWithState> workflowWithState(WorkflowId workflowId) throws IOEx
return Optional.of(WorkflowWithState.create(workflow, workflowState));
}

HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
HashMap<WorkflowId, WorkflowWithState> workflows = new HashMap<>();
var query = Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build();
datastore.query(query, entity -> {
Workflow workflow;
WorkflowState workflowState;
WorkflowWithState workflowWithState;
try {
workflow = OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
workflowState = workflowState(Optional.of(entity));
workflowWithState = WorkflowWithState.create(workflow, workflowState);
} catch (IOException e) {
log.warn("Failed to read workflow {}.", entity.getKey(), e);
return;
}
workflows.put(workflow.id(), workflowWithState);
});
return workflows;
}

private WorkflowState workflowState(Optional<Entity> workflowEntity) {
var builder = WorkflowState.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;

/**
* The interface to the persistence layer.
Expand Down Expand Up @@ -129,6 +130,13 @@ public interface Storage extends Closeable {
*/
Map<WorkflowId, Workflow> workflows() throws IOException;


/**
* Get all {@link WorkflowWithState}s.
* @return
*/
HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException;

/** Get all {@link Workflow}s by doing strongly consistent batch fetch.
*
* @param workflowIds set of {@link WorkflowId}s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.google.cloud.datastore.StringValue;
import com.google.common.collect.ImmutableSet;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.WorkflowWithState;
import com.spotify.styx.model.BackfillBuilder;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.Resource;
Expand Down Expand Up @@ -555,6 +556,39 @@ public void allFieldsAreSetWhenRetrievingWorkflowState() throws Exception {
assertThat(retrieved, is(state));
}

@Test
public void shouldReturnWorkflowsWithState() throws Exception {
assertThat(storage.workflowsWithState().isEmpty(), is(true));

Workflow workflow1 = workflow(WORKFLOW_ID1);
Workflow workflow2 = workflow(WORKFLOW_ID2);
Workflow workflow3 = workflow(WORKFLOW_ID3);

storage.store(workflow1);
storage.store(workflow2);
storage.store(workflow3);

var instant = Instant.parse("2016-03-14T14:00:00Z");
var state_workflow_id1 = WorkflowState.builder()
.enabled(true)
.nextNaturalTrigger(instant)
.nextNaturalOffsetTrigger(instant.plus(1, ChronoUnit.DAYS))
.build();
var state_workflow_id2 = state_workflow_id1.toBuilder().enabled(false).build();
var state_workflow_id3 = state_workflow_id1.toBuilder().nextNaturalOffsetTrigger(instant.plus(2, ChronoUnit.DAYS)).build();

storage.patchState(WORKFLOW_ID1, state_workflow_id1);
storage.patchState(WORKFLOW_ID2, state_workflow_id2);
storage.patchState(WORKFLOW_ID3, state_workflow_id3);

var workflows = storage.workflowsWithState();
assertThat(workflows.size(), is(3));

assertThat(workflows, hasEntry(WORKFLOW_ID1, WorkflowWithState.create(workflow1, state_workflow_id1)));
assertThat(workflows, hasEntry(WORKFLOW_ID2, WorkflowWithState.create(workflow2, state_workflow_id2)));
assertThat(workflows, hasEntry(WORKFLOW_ID3, WorkflowWithState.create(workflow3, state_workflow_id3)));
}

@Test
public void shouldReturnWorkflowWithState() throws Exception {
storage.store(WORKFLOW);
Expand Down