From df9bb0dfa3954ebc3e3ce027201fdc2e732262f3 Mon Sep 17 00:00:00 2001 From: Maja Kabiljo Date: Thu, 12 Apr 2018 08:59:13 -0700 Subject: [PATCH 1/3] GIRAPH-1185 closes #69 --- .../SendWorkerOneMessageToManyRequest.java | 40 +- .../org/apache/giraph/comm/RequestTest.java | 429 ++++++++++-------- pom.xml | 2 +- 3 files changed, 265 insertions(+), 206 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java index 7861f56d9..6ffbc9e24 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java @@ -137,16 +137,42 @@ public void doRequest(ServerData serverData) { partitionIdMsgs.put(partitionId, idMsgs); } idMsgs.add(vertexId, msg); - } - // Read ByteArrayVertexIdMessages and write to message store - for (Entry idMsgs : - partitionIdMsgs.entrySet()) { - if (!idMsgs.getValue().isEmpty()) { - serverData.getIncomingMessageStore().addPartitionMessages( - idMsgs.getKey(), idMsgs.getValue()); + // If any of the list of messages reaches the expected initialSize + // threshold, then move everything we have so far to the message store, + // to avoid maintaining large intermediate lists of messages. + if (idMsgs.getSize() >= initialSize) { + addMessagesToStore(partitionIdMsgs, + serverData.getIncomingMessageStore()); + partitionIdMsgs.clear(); } } + + // Move any remaining messages to the message store + addMessagesToStore(partitionIdMsgs, serverData.getIncomingMessageStore()); + } + } + + /** + * Adds the provided per partition messages from the hashmap to the message + * store. + * + * @param partitionIdMsgs Per partition messages + * @param messageStore Message store instance + * @param Vertex ID type + * @param Message type + */ + private static void + addMessagesToStore( + Int2ObjectOpenHashMap partitionIdMsgs, + MessageStore messageStore) { + + for (Entry idMsgs : + partitionIdMsgs.entrySet()) { + + if (!idMsgs.getValue().isEmpty()) { + messageStore.addPartitionMessages(idMsgs.getKey(), idMsgs.getValue()); + } } } } diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 5db0b7947..aa52d0fac 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -20,7 +20,6 @@ import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.netty.NettyServer; -import org.apache.giraph.comm.netty.handler.AckSignalFlag; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; import org.apache.giraph.comm.requests.SendPartitionMutationsRequest; import org.apache.giraph.comm.requests.SendVertexRequest; @@ -52,8 +51,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; @@ -67,251 +71,280 @@ * Test all the different netty requests. */ @SuppressWarnings("unchecked") +@RunWith(Enclosed.class) public class RequestTest { - /** Configuration */ - private ImmutableClassesGiraphConfiguration conf; - /** Server data */ - private ServerData serverData; - /** Server */ - private NettyServer server; - /** Client */ - private NettyClient client; - /** Worker info */ - private WorkerInfo workerInfo; - - @Before - public void setUp() { - // Setup the conf - GiraphConfiguration tmpConf = new GiraphConfiguration(); - GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class); - conf = new ImmutableClassesGiraphConfiguration(tmpConf); - - @SuppressWarnings("rawtypes") - Context context = mock(Context.class); - when(context.getConfiguration()).thenReturn(conf); - - // Start the service - serverData = MockUtils.createNewServerData(conf, context); - serverData.prepareSuperstep(); - workerInfo = new WorkerInfo(); - server = new NettyServer(conf, + abstract public static class SharedSetup { + /** * Configuration */ + protected ImmutableClassesGiraphConfiguration conf; + /** * Server data */ + protected ServerData serverData; + /** * Server */ + protected NettyServer server; + /** * Client */ + protected NettyClient client; + /** * Worker info */ + protected WorkerInfo workerInfo; + + @Before + public void setUp() { + // Setup the conf + GiraphConfiguration tmpConf = new GiraphConfiguration(); + GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class); + conf = new ImmutableClassesGiraphConfiguration(tmpConf); + + @SuppressWarnings("rawtypes") + Context context = mock(Context.class); + when(context.getConfiguration()).thenReturn(conf); + + // Start the service + serverData = MockUtils.createNewServerData(conf, context); + serverData.prepareSuperstep(); + workerInfo = new WorkerInfo(); + server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, - context, new MockExceptionHandler()); - server.start(); + context, new MockExceptionHandler()); + server.start(); - workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp()); - client = new NettyClient(context, conf, new WorkerInfo(), + workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp()); + client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); - server.setFlowControl(client.getFlowControl()); - client.connectAllAddresses( + server.setFlowControl(client.getFlowControl()); + client.connectAllAddresses( Lists.newArrayList(workerInfo)); + } } - @Test - public void sendVertexPartition() { - // Data to send - int partitionId = 13; - Partition partition = + public static class NonParameterizedTest extends SharedSetup { + @Test + public void sendVertexPartition() { + // Data to send + int partitionId = 13; + Partition partition = conf.createPartition(partitionId, null); - for (int i = 0; i < 10; ++i) { - Vertex vertex = conf.createVertex(); - vertex.initialize(new IntWritable(i), new IntWritable(i)); - partition.putVertex(vertex); - } + for (int i = 0; i < 10; ++i) { + Vertex vertex = conf.createVertex(); + vertex.initialize(new IntWritable(i), new IntWritable(i)); + partition.putVertex(vertex); + } - // Send the request - SendVertexRequest request = - new SendVertexRequest(partition); - client.sendWritableRequest(workerInfo.getTaskId(), request); - client.waitAllRequests(); + // Send the request + SendVertexRequest request = + new SendVertexRequest(partition); + client.sendWritableRequest(workerInfo.getTaskId(), request); + client.waitAllRequests(); - // Stop the service - client.stop(); - server.stop(); + // Stop the service + client.stop(); + server.stop(); - // Check the output - PartitionStore partitionStore = + // Check the output + PartitionStore partitionStore = serverData.getPartitionStore(); - assertTrue(partitionStore.hasPartition(partitionId)); - int total = 0; - Partition partition2 = + assertTrue(partitionStore.hasPartition(partitionId)); + int total = 0; + Partition partition2 = partitionStore.removePartition(partitionId); - for (Vertex vertex : partition2) { - total += vertex.getId().get(); + for (Vertex vertex : partition2) { + total += vertex.getId().get(); + } + partitionStore.putPartition(partition2); + assertEquals(total, 45); + partitionStore.shutdown(); } - partitionStore.putPartition(partition2); - assertEquals(total, 45); - partitionStore.shutdown(); - } - @Test - public void sendWorkerMessagesRequest() { - // Data to send - PairList> + @Test + public void sendWorkerMessagesRequest() { + // Data to send + PairList> dataToSend = new PairList<>(); - dataToSend.initialize(); - int partitionId = 0; - ByteArrayVertexIdMessages vertexIdMessages = + dataToSend.initialize(); + int partitionId = 0; + ByteArrayVertexIdMessages vertexIdMessages = new ByteArrayVertexIdMessages<>( - new TestMessageValueFactory<>(IntWritable.class)); - vertexIdMessages.setConf(conf); - vertexIdMessages.initialize(); - dataToSend.add(partitionId, vertexIdMessages); - for (int i = 1; i < 7; ++i) { - IntWritable vertexId = new IntWritable(i); - for (int j = 0; j < i; ++j) { - vertexIdMessages.add(vertexId, new IntWritable(j)); - } - } - - // Send the request - SendWorkerMessagesRequest request = - new SendWorkerMessagesRequest<>(dataToSend); - request.setConf(conf); - - client.sendWritableRequest(workerInfo.getTaskId(), request); - client.waitAllRequests(); - - // Stop the service - client.stop(); - server.stop(); - - // Check the output - Iterable vertices = - serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); - int keySum = 0; - int messageSum = 0; - for (IntWritable vertexId : vertices) { - keySum += vertexId.get(); - Iterable messages = - serverData.getIncomingMessageStore().getVertexMessages( - vertexId); - synchronized (messages) { - for (IntWritable message : messages) { - messageSum += message.get(); + new TestMessageValueFactory<>(IntWritable.class)); + vertexIdMessages.setConf(conf); + vertexIdMessages.initialize(); + dataToSend.add(partitionId, vertexIdMessages); + for (int i = 1; i < 7; ++i) { + IntWritable vertexId = new IntWritable(i); + for (int j = 0; j < i; ++j) { + vertexIdMessages.add(vertexId, new IntWritable(j)); } } - } - assertEquals(21, keySum); - assertEquals(35, messageSum); - } - @Test - public void sendWorkerIndividualMessagesRequest() throws IOException { - // Data to send - ByteArrayOneMessageToManyIds - dataToSend = new ByteArrayOneMessageToManyIds<>(new - TestMessageValueFactory<>(IntWritable.class)); - dataToSend.setConf(conf); - dataToSend.initialize(); - ExtendedDataOutput output = conf.createExtendedDataOutput(); - for (int i = 1; i <= 7; ++i) { - IntWritable vertexId = new IntWritable(i); - vertexId.write(output); - } - dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1)); + // Send the request + SendWorkerMessagesRequest request = + new SendWorkerMessagesRequest<>(dataToSend); + request.setConf(conf); - // Send the request - SendWorkerOneMessageToManyRequest request = - new SendWorkerOneMessageToManyRequest<>(dataToSend, conf); - client.sendWritableRequest(workerInfo.getTaskId(), request); - client.waitAllRequests(); + client.sendWritableRequest(workerInfo.getTaskId(), request); + client.waitAllRequests(); - // Stop the service - client.stop(); - server.stop(); + // Stop the service + client.stop(); + server.stop(); - // Check the output - Iterable vertices = + // Check the output + Iterable vertices = serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); - int keySum = 0; - int messageSum = 0; - for (IntWritable vertexId : vertices) { - keySum += vertexId.get(); - Iterable messages = + int keySum = 0; + int messageSum = 0; + for (IntWritable vertexId : vertices) { + keySum += vertexId.get(); + Iterable messages = serverData.getIncomingMessageStore().getVertexMessages( - vertexId); - synchronized (messages) { - for (IntWritable message : messages) { - messageSum += message.get(); + vertexId); + synchronized (messages) { + for (IntWritable message : messages) { + messageSum += message.get(); + } } } + assertEquals(21, keySum); + assertEquals(35, messageSum); } - assertEquals(28, keySum); - assertEquals(7, messageSum); - } - @Test - public void sendPartitionMutationsRequest() { - // Data to send - int partitionId = 19; - Map> vertexIdMutations = + + @Test + public void sendPartitionMutationsRequest() { + // Data to send + int partitionId = 19; + Map> vertexIdMutations = Maps.newHashMap(); - for (int i = 0; i < 11; ++i) { - VertexMutations mutations = + for (int i = 0; i < 11; ++i) { + VertexMutations mutations = new VertexMutations(); - for (int j = 0; j < 3; ++j) { - Vertex vertex = conf.createVertex(); - vertex.initialize(new IntWritable(i), new IntWritable(j)); - mutations.addVertex(vertex); - } - for (int j = 0; j < 2; ++j) { - mutations.removeVertex(); - } - for (int j = 0; j < 5; ++j) { - Edge edge = + for (int j = 0; j < 3; ++j) { + Vertex vertex = conf.createVertex(); + vertex.initialize(new IntWritable(i), new IntWritable(j)); + mutations.addVertex(vertex); + } + for (int j = 0; j < 2; ++j) { + mutations.removeVertex(); + } + for (int j = 0; j < 5; ++j) { + Edge edge = EdgeFactory.create(new IntWritable(i), new IntWritable(2 * j)); - mutations.addEdge(edge); - } - for (int j = 0; j < 7; ++j) { - mutations.removeEdge(new IntWritable(j)); + mutations.addEdge(edge); + } + for (int j = 0; j < 7; ++j) { + mutations.removeEdge(new IntWritable(j)); + } + vertexIdMutations.put(new IntWritable(i), mutations); } - vertexIdMutations.put(new IntWritable(i), mutations); - } - // Send the request - SendPartitionMutationsRequest + // Send the request + SendPartitionMutationsRequest request = new SendPartitionMutationsRequest(partitionId, vertexIdMutations); - GiraphMetrics.init(conf); - client.sendWritableRequest(workerInfo.getTaskId(), request); - client.waitAllRequests(); + GiraphMetrics.init(conf); + client.sendWritableRequest(workerInfo.getTaskId(), request); + client.waitAllRequests(); - // Stop the service - client.stop(); - server.stop(); + // Stop the service + client.stop(); + server.stop(); - // Check the output - ConcurrentMap> inVertexIdMutations = serverData.getPartitionMutations().get(partitionId); - int keySum = 0; - for (Entry> entry : inVertexIdMutations - .entrySet()) { - synchronized (entry.getValue()) { - keySum += entry.getKey().get(); - int vertexValueSum = 0; - for (Vertex vertex : entry + .entrySet()) { + synchronized (entry.getValue()) { + keySum += entry.getKey().get(); + int vertexValueSum = 0; + for (Vertex vertex : entry .getValue().getAddedVertexList()) { - vertexValueSum += vertex.getValue().get(); - } - assertEquals(3, vertexValueSum); - assertEquals(2, entry.getValue().getRemovedVertexCount()); - int removeEdgeValueSum = 0; - for (Edge edge : entry.getValue() + vertexValueSum += vertex.getValue().get(); + } + assertEquals(3, vertexValueSum); + assertEquals(2, entry.getValue().getRemovedVertexCount()); + int removeEdgeValueSum = 0; + for (Edge edge : entry.getValue() .getAddedEdgeList()) { - removeEdgeValueSum += edge.getValue().get(); + removeEdgeValueSum += edge.getValue().get(); + } + assertEquals(20, removeEdgeValueSum); + } + } + assertEquals(55, keySum); + } + } + + /** + * Tests that we run for different input parameters. + */ + @RunWith(Parameterized.class) + public static class ParameterizedTest extends SharedSetup { + + @Parameterized.Parameter(value = 0) + public int numVertices; + + @Parameterized.Parameters(name = "{index}: numVertices={0}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {10}, + {1000}, + {1000000} + }); + } + + @Test + public void sendWorkerIndividualMessagesRequest() + throws IOException { + + int expectedKeySum = 0; + + ByteArrayOneMessageToManyIds + dataToSend = new ByteArrayOneMessageToManyIds<>(new + TestMessageValueFactory<>(IntWritable.class)); + dataToSend.setConf(conf); + dataToSend.initialize(); + ExtendedDataOutput output = conf.createExtendedDataOutput(); + for (int i = 1; i <= numVertices; ++i) { + IntWritable vertexId = new IntWritable(i); + vertexId.write(output); + expectedKeySum += i; + } + dataToSend.add(output.getByteArray(), output.getPos(), numVertices, new IntWritable(1)); + + // Send the request + SendWorkerOneMessageToManyRequest request = + new SendWorkerOneMessageToManyRequest<>(dataToSend, conf); + client.sendWritableRequest(workerInfo.getTaskId(), request); + client.waitAllRequests(); + + // Stop the service + client.stop(); + server.stop(); + + // Check the output + Iterable vertices = + serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); + int keySum = 0; + int messageSum = 0; + for (IntWritable vertexId : vertices) { + keySum += vertexId.get(); + Iterable messages = + serverData.getIncomingMessageStore().getVertexMessages( + vertexId); + synchronized (messages) { + for (IntWritable message : messages) { + messageSum += message.get(); + } } - assertEquals(20, removeEdgeValueSum); } + assertEquals(expectedKeySum, keySum); + assertEquals(numVertices, messageSum); } - assertEquals(55, keySum); } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index e2eff4bb1..37ab72de5 100644 --- a/pom.xml +++ b/pom.xml @@ -341,7 +341,7 @@ under the License. 1.17 6.1.26 20160810 - 4.11 + 4.12 2.5.3 3.0.1 0.29 From 3c2c6257fdc30dede0e50aacd09a233ac7ffe1ed Mon Sep 17 00:00:00 2001 From: Dionysios Logothetis Date: Thu, 26 Apr 2018 08:21:55 -0700 Subject: [PATCH 2/3] Documentation --- .../comm/requests/SendWorkerOneMessageToManyRequest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java index 6ffbc9e24..4516204bf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java @@ -139,8 +139,8 @@ public void doRequest(ServerData serverData) { idMsgs.add(vertexId, msg); // If any of the list of messages reaches the expected initialSize - // threshold, then move everything we have so far to the message store, - // to avoid maintaining large intermediate lists of messages. + // threshold, then move everything we have so far to the message store. + // This avoids maintaining large intermediate lists of messages. if (idMsgs.getSize() >= initialSize) { addMessagesToStore(partitionIdMsgs, serverData.getIncomingMessageStore()); From 0c51a44a80831e800c067643ebabb4b36270593d Mon Sep 17 00:00:00 2001 From: Dionysios Logothetis Date: Tue, 1 May 2018 13:43:46 -0700 Subject: [PATCH 3/3] Simplify parameter definition --- .../src/test/java/org/apache/giraph/comm/RequestTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index aa52d0fac..b644f1b85 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -290,12 +290,8 @@ public static class ParameterizedTest extends SharedSetup { public int numVertices; @Parameterized.Parameters(name = "{index}: numVertices={0}") - public static Collection data() { - return Arrays.asList(new Object[][]{ - {10}, - {1000}, - {1000000} - }); + public static Object[] data() { + return new Object[] {10, 1000, 1000000}; } @Test