Skip to content

Commit 8f62909

Browse files
Adds tests for resource sharing flow and add a CI job to run resource-sharing tests
Signed-off-by: Darshit Chanpura <dchanp@amazon.com>
1 parent dc16660 commit 8f62909

File tree

5 files changed

+635
-46
lines changed

5 files changed

+635
-46
lines changed

.github/workflows/test_security.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ jobs:
2020
strategy:
2121
matrix:
2222
java: [21]
23+
resource_sharing_flag: [ "", "-Dresource_sharing.enabled=true" ]
2324

2425
name: Run Security Integration Tests on Linux
2526
runs-on: ubuntu-latest
@@ -45,4 +46,8 @@ jobs:
4546
# switching the user, as OpenSearch cluster can only be started as root/Administrator on linux-deb/linux-rpm/windows-zip.
4647
run: |
4748
chown -R 1000:1000 `pwd`
48-
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest -Dsecurity.enabled=true"
49+
su `id -un 1000` -c "whoami && java -version && ./gradlew integTest \
50+
-Dsecurity.enabled=true \
51+
-Dhttps=true \
52+
${{ matrix.resource_sharing_flag }} \
53+
--tests '*IT'"

build.gradle

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ dependencies {
256256
testImplementation("org.junit.jupiter:junit-jupiter:${junitJupiterVersion}")
257257
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}")
258258

259+
testImplementation 'org.awaitility:awaitility:4.3.0'
260+
259261
// ZipArchive dependencies used for integration tests
260262
zipArchive("org.opensearch.plugin:opensearch-job-scheduler:${opensearch_build}")
261263
zipArchive("org.opensearch.plugin:opensearch-ml-plugin:${opensearch_build}")
@@ -351,6 +353,8 @@ integTest {
351353
systemProperty('user', user)
352354
systemProperty('password', password)
353355

356+
systemProperty "resource_sharing.enabled", System.getProperty("resource_sharing.enabled")
357+
354358
// Only tenant aware test if set
355359
if (System.getProperty("tests.rest.tenantaware") == "true") {
356360
filter {
@@ -510,6 +514,10 @@ testClusters.integTest {
510514
'".plugins-flow-framework-state"' +
511515
']'
512516
)
517+
if (System.getProperty("resource_sharing.enabled") == "true") {
518+
setting("plugins.security.experimental.resource_sharing.enabled", "true")
519+
setting("plugins.security.experimental.resource_sharing.protected_types", "[\"workflow\", \"workflow_state\"]")
520+
}
513521
setSecure(true)
514522
}
515523

@@ -542,7 +550,13 @@ testClusters.integTest {
542550
if (System.getProperty("opensearch.debug") != null) {
543551
def debugPort = 5005
544552
nodes.forEach { node ->
545-
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
553+
// server=n,suspend=y -> node tries to connect to a debugger and hence test runs fails with
554+
// Exec output and error:
555+
// | Output for ./bin/opensearch-plugin:ERROR: transport error 202: connect failed: Connection refused
556+
// | ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
557+
// | JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [src/jdk.jdwp.agent/share/native/libjdwp/debugInit.c:700].
558+
// So instead, we listen to a debugger by saying server=y and suspend=n
559+
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:${debugPort}")
546560
debugPort += 1
547561
}
548562
}

src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java

Lines changed: 201 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.flowframework;
1010

1111
import com.google.gson.JsonArray;
12-
import org.apache.commons.lang3.RandomStringUtils;
1312
import org.apache.hc.client5.http.auth.AuthScope;
1413
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
1514
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
@@ -26,6 +25,7 @@
2625
import org.apache.hc.core5.reactor.ssl.TlsDetails;
2726
import org.apache.hc.core5.ssl.SSLContextBuilder;
2827
import org.apache.hc.core5.util.Timeout;
28+
import org.opensearch.OpenSearchStatusException;
2929
import org.opensearch.action.ingest.GetPipelineResponse;
3030
import org.opensearch.action.search.SearchResponse;
3131
import org.opensearch.client.Request;
@@ -37,11 +37,14 @@
3737
import org.opensearch.common.unit.TimeValue;
3838
import org.opensearch.common.util.concurrent.ThreadContext;
3939
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
40+
import org.opensearch.common.xcontent.XContentFactory;
4041
import org.opensearch.common.xcontent.json.JsonXContent;
4142
import org.opensearch.core.rest.RestStatus;
4243
import org.opensearch.core.xcontent.DeprecationHandler;
4344
import org.opensearch.core.xcontent.MediaType;
4445
import org.opensearch.core.xcontent.NamedXContentRegistry;
46+
import org.opensearch.core.xcontent.ToXContent;
47+
import org.opensearch.core.xcontent.XContentBuilder;
4548
import org.opensearch.core.xcontent.XContentParser;
4649
import org.opensearch.flowframework.common.CommonValue;
4750
import org.opensearch.flowframework.model.ProvisioningProgress;
@@ -51,20 +54,26 @@
5154
import org.opensearch.flowframework.model.WorkflowState;
5255
import org.opensearch.flowframework.util.ParseUtils;
5356
import org.opensearch.ml.repackage.com.google.common.collect.ImmutableList;
57+
import org.opensearch.security.spi.resources.sharing.Recipients;
5458
import org.opensearch.test.rest.OpenSearchRestTestCase;
5559
import org.junit.After;
5660
import org.junit.Before;
5761

5862
import java.io.IOException;
63+
import java.security.SecureRandom;
64+
import java.time.Duration;
5965
import java.util.ArrayList;
6066
import java.util.Collections;
67+
import java.util.HashMap;
6168
import java.util.List;
6269
import java.util.Locale;
6370
import java.util.Map;
6471
import java.util.Optional;
6572
import java.util.concurrent.TimeUnit;
6673
import java.util.stream.Collectors;
6774

75+
import org.awaitility.Awaitility;
76+
6877
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
6978
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
7079

@@ -73,6 +82,8 @@
7382
*/
7483
public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase {
7584

85+
public static final String SHARE_WORKFLOW_URI = "/_plugins/_security/api/resource/share";
86+
7687
@Before
7788
protected void setUpSettings() throws Exception {
7889

@@ -296,11 +307,42 @@ protected boolean preserveClusterSettings() {
296307
}
297308

298309
/**
299-
* Create an unique password. Simple password are weak due to https://tinyurl.com/383em9zk
310+
* Create an unguessable password. Simple password are weak due to https://tinyurl.com/383em9zk
300311
* @return a random password.
301312
*/
302313
public static String generatePassword(String username) {
303-
return RandomStringUtils.random(15, true, true);
314+
String upperCase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
315+
String lowerCase = "abcdefghijklmnopqrstuvwxyz";
316+
String digits = "0123456789";
317+
String special = "_";
318+
String characters = upperCase + lowerCase + digits + special;
319+
320+
SecureRandom rng = new SecureRandom();
321+
322+
// Ensure password includes at least one character from each set
323+
char[] password = new char[15];
324+
password[0] = upperCase.charAt(rng.nextInt(upperCase.length()));
325+
password[1] = lowerCase.charAt(rng.nextInt(lowerCase.length()));
326+
password[2] = digits.charAt(rng.nextInt(digits.length()));
327+
password[3] = special.charAt(rng.nextInt(special.length()));
328+
329+
for (int i = 4; i < 15; i++) {
330+
char nextChar;
331+
do {
332+
nextChar = characters.charAt(rng.nextInt(characters.length()));
333+
} while (username.indexOf(nextChar) > -1);
334+
password[i] = nextChar;
335+
}
336+
337+
// Shuffle the array to ensure the first 4 characters are not always in the same position
338+
for (int i = password.length - 1; i > 0; i--) {
339+
int index = rng.nextInt(i + 1);
340+
char temp = password[index];
341+
password[index] = password[i];
342+
password[i] = temp;
343+
}
344+
345+
return new String(password);
304346
}
305347

306348
/**
@@ -485,7 +527,7 @@ protected Response createWorkflowValidation(RestClient client, Template template
485527
* Helper method to invoke the Reprovision Workflow API
486528
* @param client the rest client
487529
* @param workflowId the document id
488-
* @param templateFields the template to reprovision
530+
* @param template the template to reprovision
489531
* @throws Exception if the request fails
490532
* @return a rest response
491533
*/
@@ -985,4 +1027,159 @@ protected List<String> catPlugins() throws IOException {
9851027
).list();
9861028
return pluginsList.stream().map(o -> ((Map<String, Object>) o).get("component").toString()).collect(Collectors.toList());
9871029
}
1030+
1031+
protected boolean isResourceSharingFeatureEnabled() {
1032+
return Optional.ofNullable(System.getProperty("resource_sharing.enabled")).map("true"::equalsIgnoreCase).orElse(false);
1033+
}
1034+
1035+
public static Response shareConfig(RestClient client, Map<String, String> params, String payload) throws IOException {
1036+
return TestHelpers.makeRequest(client, "PUT", SHARE_WORKFLOW_URI, params, payload, null);
1037+
}
1038+
1039+
public static Response patchSharingInfo(RestClient client, Map<String, String> params, String payload) throws IOException {
1040+
return TestHelpers.makeRequest(client, "PATCH", SHARE_WORKFLOW_URI, params, payload, null);
1041+
}
1042+
1043+
public static String shareWithUserPayload(String resourceId, String resourceIndex, String accessLevel, String user) {
1044+
return String.format(Locale.ROOT, """
1045+
{
1046+
"resource_id": "%s",
1047+
"resource_type": "%s",
1048+
"share_with": {
1049+
"%s" : {
1050+
"users": ["%s"]
1051+
}
1052+
}
1053+
}
1054+
""", resourceId, resourceIndex, accessLevel, user);
1055+
}
1056+
1057+
public static class PatchSharingInfoPayloadBuilder {
1058+
private String configId;
1059+
private String configType;
1060+
private final Map<String, Recipients> share = new HashMap<>();
1061+
private final Map<String, Recipients> revoke = new HashMap<>();
1062+
1063+
public PatchSharingInfoPayloadBuilder configId(String resourceId) {
1064+
this.configId = resourceId;
1065+
return this;
1066+
}
1067+
1068+
public PatchSharingInfoPayloadBuilder configType(String resourceType) {
1069+
this.configType = resourceType;
1070+
return this;
1071+
}
1072+
1073+
public void share(Recipients recipients, String accessLevel) {
1074+
Recipients existing = share.getOrDefault(accessLevel, new Recipients(new HashMap<>()));
1075+
existing.share(recipients);
1076+
share.put(accessLevel, existing);
1077+
}
1078+
1079+
public void revoke(Recipients recipients, String accessLevel) {
1080+
Recipients existing = revoke.getOrDefault(accessLevel, new Recipients(new HashMap<>()));
1081+
// intentionally share() is called here since we are building a shareWith object, this final object will be used to remove
1082+
// access
1083+
// think of it as currentShareWith.removeAll(revokeShareWith)
1084+
existing.share(recipients);
1085+
revoke.put(accessLevel, existing);
1086+
}
1087+
1088+
private String buildJsonString(Map<String, Recipients> input) {
1089+
1090+
List<String> output = new ArrayList<>();
1091+
for (Map.Entry<String, Recipients> entry : input.entrySet()) {
1092+
try {
1093+
XContentBuilder builder = XContentFactory.jsonBuilder();
1094+
entry.getValue().toXContent(builder, ToXContent.EMPTY_PARAMS);
1095+
String recipJson = builder.toString();
1096+
output.add(String.format(Locale.ROOT, "\"%s\" : %s", entry.getKey(), recipJson));
1097+
} catch (IOException e) {
1098+
throw new RuntimeException(e);
1099+
}
1100+
1101+
}
1102+
1103+
return String.join(",", output);
1104+
1105+
}
1106+
1107+
public String build() {
1108+
String allShares = buildJsonString(share);
1109+
String allRevokes = buildJsonString(revoke);
1110+
return String.format(Locale.ROOT, """
1111+
{
1112+
"resource_id": "%s",
1113+
"resource_type": "%s",
1114+
"add": {
1115+
%s
1116+
},
1117+
"revoke": {
1118+
%s
1119+
}
1120+
}
1121+
""", configId, configType, allShares, allRevokes);
1122+
}
1123+
}
1124+
1125+
public static boolean isForbidden(Exception e) {
1126+
if (e instanceof OpenSearchStatusException) {
1127+
return ((OpenSearchStatusException) e).status() == RestStatus.FORBIDDEN;
1128+
}
1129+
if (e instanceof ResponseException) {
1130+
return ((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 403;
1131+
}
1132+
return false;
1133+
}
1134+
1135+
private static final Duration RS_WAIT_TIMEOUT = Duration.ofSeconds(30);
1136+
private static final Duration RS_POLL_INTERVAL = Duration.ofMillis(200);
1137+
1138+
// Core waiter: visible when the callable returns 200 OK; 403 means "not yet", anything else fails fast.
1139+
private void waitUntilVisible(java.util.concurrent.Callable<Response> op) {
1140+
Awaitility.await().atMost(RS_WAIT_TIMEOUT).pollInterval(RS_POLL_INTERVAL).until(() -> {
1141+
try {
1142+
Response r = op.call();
1143+
return TestHelpers.restStatus(r) == RestStatus.OK;
1144+
} catch (Exception e) {
1145+
if (isForbidden(e)) {
1146+
// eventual consistency: not visible yet
1147+
return false;
1148+
}
1149+
// unexpected error: fail fast
1150+
throw e;
1151+
}
1152+
});
1153+
}
1154+
1155+
// Core waiter: non-visible when the callable throws 403; 200 means "still visible", anything else fails fast.
1156+
private void waitUntilForbidden(java.util.concurrent.Callable<Response> op) {
1157+
Awaitility.await().atMost(RS_WAIT_TIMEOUT).pollInterval(RS_POLL_INTERVAL).until(() -> {
1158+
try {
1159+
op.call(); // 200 => still visible
1160+
return false;
1161+
} catch (Exception e) {
1162+
if (isForbidden(e)) {
1163+
return true; // forbidden now
1164+
}
1165+
throw e; // unexpected error: fail fast
1166+
}
1167+
});
1168+
}
1169+
1170+
protected void waitForWorkflowSharingVisibility(String workflowId, RestClient client) {
1171+
waitUntilVisible(() -> getWorkflow(client, workflowId));
1172+
}
1173+
1174+
protected void waitForWorkflowRevokeNonVisibility(String workflowId, RestClient client) {
1175+
waitUntilForbidden(() -> getWorkflow(client, workflowId));
1176+
}
1177+
1178+
protected void waitForWorkflowStateSharingVisibility(String workflowId, RestClient client) {
1179+
waitUntilVisible(() -> getWorkflowStatus(client, workflowId, false));
1180+
}
1181+
1182+
protected void waitForWorkflowStateRevokeNonVisibility(String workflowId, RestClient client) {
1183+
waitUntilForbidden(() -> getWorkflowStatus(client, workflowId, false));
1184+
}
9881185
}

0 commit comments

Comments
 (0)