>of().iterator();
+ } else {
+ return new BVEdgesIterator();
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ size = in.readInt();
+ int intervalResidualEdgesBytesUsed = in.readInt();
+ if (intervalResidualEdgesBytesUsed > 0) {
+ // Only create a new buffer if the old one isn't big enough
+ if (intervalsAndResiduals == null ||
+ intervalResidualEdgesBytesUsed > intervalsAndResiduals.length) {
+ intervalsAndResiduals = new byte[intervalResidualEdgesBytesUsed];
+ }
+ in.readFully(intervalsAndResiduals, 0, intervalResidualEdgesBytesUsed);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(size);
+ out.writeInt(intervalsAndResiduals.length);
+ if (intervalsAndResiduals.length > 0) {
+ out.write(intervalsAndResiduals, 0, intervalsAndResiduals.length);
+ }
+ }
+
+ @Override
+ public void trim() {
+ // Nothing to do
+ }
+
+ /**
+ * Receives an integer array of successors and compresses them in the
+ * intervalsAndResiduals byte array
+ *
+ * @param edgesArray an integer array of successors
+ */
+ private void compress(final int[] edgesArray) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ diffComp(edgesArray, new OutputBitStream(baos));
+ intervalsAndResiduals = baos.toByteArray();
+ size = edgesArray.length;
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * See also
+ * {@link it.unimi.dsi.webgraph.BVGraph#diffComp(int[], OutputBitStream)}.
+ * This method is given an integer successors array and produces, onto a given
+ * output bit stream, the differentially compressed successor list.
+ *
+ * @param edgesArray the integer successors array
+ * @param obs an output bit stream where the compressed data will be
+ * stored.
+ */
+ public static void diffComp(final int[] edgesArray, OutputBitStream obs)
+ throws IOException {
+ // We write the degree.
+ obs.writeInt(edgesArray.length, Integer.SIZE);
+ IntArrayList left = new IntArrayList();
+ IntArrayList len = new IntArrayList();
+ IntArrayList residuals = new IntArrayList();
+ // If we are to produce intervals, we first compute them.
+ int intervalCount;
+ try {
+ intervalCount = BVEdges.intervalize(edgesArray,
+ BVGraph.DEFAULT_MIN_INTERVAL_LENGTH, Integer.MAX_VALUE, left, len,
+ residuals);
+ } catch (IllegalArgumentException e) {
+ // array was not sorted, sorting and retrying
+ Arrays.sort(edgesArray);
+ left = new IntArrayList();
+ len = new IntArrayList();
+ residuals = new IntArrayList();
+ intervalCount = BVEdges.intervalize(edgesArray,
+ BVGraph.DEFAULT_MIN_INTERVAL_LENGTH, Integer.MAX_VALUE, left, len,
+ residuals);
+ }
+ // We write the number of intervals.
+ obs.writeGamma(intervalCount);
+ int currIntLen;
+ int prev = 0;
+ if (intervalCount > 0) {
+ obs.writeInt(left.getInt(0), Integer.SIZE);
+ currIntLen = len.getInt(0);
+ prev = left.getInt(0) + currIntLen;
+ obs.writeGamma(currIntLen - BVGraph.DEFAULT_MIN_INTERVAL_LENGTH);
+ }
+ // We write out the intervals.
+ for (int i = 1; i < intervalCount; i++) {
+ obs.writeGamma(left.getInt(i) - prev - 1);
+ currIntLen = len.getInt(i);
+ prev = left.getInt(i) + currIntLen;
+ obs.writeGamma(currIntLen - BVGraph.DEFAULT_MIN_INTERVAL_LENGTH);
+ }
+ final int[] residual = residuals.elements();
+ final int residualCount = residuals.size();
+ // Now we write out the residuals, if any
+ if (residualCount != 0) {
+ if (intervalCount > 0) {
+ prev = residual[0];
+ obs.writeLongZeta(
+ Fast.int2nat((long) prev - left.getInt(0)),
+ BVGraph.DEFAULT_ZETA_K);
+ } else {
+ prev = residual[0];
+ obs.writeInt(prev, Integer.SIZE);
+ }
+ for (int i = 1; i < residualCount; i++) {
+ if (residual[i] == prev) {
+ throw new IllegalArgumentException(
+ "Repeated successor " + prev + " in successor list of this node");
+ }
+ obs.writeLongZeta(residual[i] - prev - 1L, BVGraph.DEFAULT_ZETA_K);
+ prev = residual[i];
+ }
+ }
+ obs.flush();
+ }
+
+ /**
+ * See also {@link it.unimi.dsi.webgraph.BVGraph#intervalize(IntArrayList,
+ * int, IntArrayList, IntArrayList, IntArrayList)}.
+ * This method tries to express an increasing sequence of natural numbers
+ * x as a union of an increasing sequence of intervals and an
+ * increasing sequence of residual elements. More precisely, this
+ * intervalization works as follows: first, one looks at
+ * edgesArray as a sequence of intervals (i.e., maximal sequences
+ * of consecutive elements); those intervals whose length is ≥
+ * minInterval are stored in the lists left (the
+ * list of left extremes) and len (the list of lengths; the
+ * length of an integer interval is the number of integers in that interval).
+ * The remaining integers, called residuals are stored in the
+ * residual list.
+ *
+ *
+ * Note that the previous content of left, len and
+ * residual is lost.
+ *
+ * @param edgesArray the array to be intervalized (an increasing list of
+ * natural numbers).
+ * @param minInterval the least length that a maximal sequence of consecutive
+ * elements must have in order for it to be considered as
+ * an interval.
+ * @param maxInterval the maximum length that a maximal sequence of
+ * consecutive elements must have in order for it to be
+ * considered as an interval.
+ * @param left the resulting list of left extremes of the intervals.
+ * @param len the resulting list of interval lengths.
+ * @param residuals the resulting list of residuals.
+ * @return the number of intervals.
+ */
+ protected static int intervalize(final int[] edgesArray,
+ final int minInterval, final int maxInterval, final IntArrayList left,
+ final IntArrayList len, final IntArrayList residuals) {
+ int nInterval = 0;
+ int i;
+ int j;
+
+ for (i = 0; i < edgesArray.length; i++) {
+ j = 0;
+ checkIsSorted(edgesArray, i);
+ if (i < edgesArray.length - 1 && edgesArray[i] + 1 == edgesArray[i + 1]) {
+ do {
+ j++;
+ } while (i + j < edgesArray.length - 1 && j < maxInterval &&
+ edgesArray[i + j] + 1 == edgesArray[i + j + 1]);
+ checkIsSorted(edgesArray, i + j);
+ j++;
+ // Now j is the number of integers in the interval.
+ if (j >= minInterval) {
+ left.add(edgesArray[i]);
+ len.add(j);
+ nInterval++;
+ i += j - 1;
+ }
+ }
+ if (j < minInterval) {
+ residuals.add(edgesArray[i]);
+ }
+ }
+ return nInterval;
+ }
+
+ /**
+ * Given an integer array and an index, this method throws an exception in
+ * case the element of the array the index points at is equal or larger than
+ * its next.
+ *
+ * @param edgesArray the integer array
+ * @param i the index
+ */
+ private static void checkIsSorted(int[] edgesArray, int i) {
+ if (i < edgesArray.length - 1 && edgesArray[i] == edgesArray[i + 1]) {
+ throw new IllegalArgumentException("Parallel edges are not allowed.");
+ }
+ if (i < edgesArray.length - 1 && edgesArray[i] > edgesArray[i + 1]) {
+ throw new IllegalArgumentException("Edges are not sorted.");
+ }
+ }
+
+ /**
+ * Iterator that reuses the same Edge object.
+ */
+ private class BVEdgesIterator
+ extends UnmodifiableIterator> {
+ /** Wrapped map iterator. */
+ private LazyIntIterator liIter = successors(
+ new InputBitStream(intervalsAndResiduals));
+ /** Representative edge object. */
+ private final Edge representativeEdge =
+ EdgeFactory.create(new IntWritable());
+ /** Current edge count */
+ private int currentEdge = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentEdge < size;
+ }
+
+ @Override
+ public Edge next() {
+ representativeEdge.getTargetVertexId().set(liIter.nextInt());
+ currentEdge++;
+ return representativeEdge;
+ }
+
+ /** Creates an iterator from an input bit stream
+ *
+ * @param ibs input bit stream with compressed intervals and residuals
+ * @return an iterator with the successors of the input bit stream
+ *
+ * */
+ private LazyIntIterator successors(InputBitStream ibs) {
+ try {
+ final int d;
+ int extraCount;
+ int firstIntervalNode = -1;
+ ibs.position(0);
+ d = ibs.readInt(Integer.SIZE);
+ if (d == 0) {
+ return LazyIntIterators.EMPTY_ITERATOR;
+ }
+ extraCount = d;
+ int intervalCount = 0; // Number of intervals
+ int[] left = null;
+ int[] len = null;
+ // Prepare to read intervals, if any
+ intervalCount = ibs.readGamma();
+ if (extraCount > 0 && intervalCount != 0) {
+ int prev = 0; // Holds the last integer in the last
+ // interval.
+ left = new int[intervalCount];
+ len = new int[intervalCount];
+ // Now we read intervals
+ firstIntervalNode = ibs.readInt(Integer.SIZE);
+ left[0] = firstIntervalNode;
+ len[0] = ibs.readGamma() + BVGraph.DEFAULT_MIN_INTERVAL_LENGTH;
+
+ prev = left[0] + len[0];
+ extraCount -= len[0];
+ for (int i = 1; i < intervalCount; i++) {
+ left[i] = ibs.readGamma() + prev + 1;
+ len[i] = ibs.readGamma() + BVGraph.DEFAULT_MIN_INTERVAL_LENGTH;
+ prev = left[i] + len[i];
+ extraCount -= len[i];
+ }
+ }
+
+ final int residualCount = extraCount; // Just to be able to use
+ // an
+ // anonymous class.
+ final LazyIntIterator residualIterator = residualCount == 0 ? null :
+ new ResidualIntIterator(ibs, residualCount, firstIntervalNode);
+ // The extra part is made by the contribution of intervals, if
+ // any, and by the residuals iterator.
+ if (intervalCount == 0) {
+ return residualIterator;
+ } else if (residualCount == 0) {
+ return new IntIntervalSequenceIterator(left, len);
+ } else {
+ return new MergedIntIterator(
+ new IntIntervalSequenceIterator(left, len), residualIterator);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /** An iterator returning the residuals of a node. */
+ private final class ResidualIntIterator extends AbstractLazyIntIterator {
+ /** The input bit stream from which residuals will be read. */
+ private final InputBitStream ibs;
+ /** The last residual returned. */
+ private int next;
+ /** The number of remaining residuals. */
+ private int remaining;
+
+ /** Constructor
+ *
+ * @param ibs the input bit stream with the residuals
+ * @param remaining the total elements in the stream
+ * @param x the value of the previous (or first) edge id
+ *
+ * */
+ private ResidualIntIterator(final InputBitStream ibs,
+ final int remaining, int x) {
+ this.remaining = remaining;
+ this.ibs = ibs;
+ try {
+ if (x >= 0) {
+ long temp = Fast.nat2int(ibs.readLongZeta(BVGraph.DEFAULT_ZETA_K));
+ this.next = (int) (x + temp);
+ } else {
+ this.next = ibs.readInt(Integer.SIZE);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /** Return the next integer of the iterator
+ *
+ * @return the next integer of the iterator
+ * */
+ public int nextInt() {
+ if (remaining == 0) {
+ return -1;
+ }
+ try {
+ final int result = next;
+ if (--remaining != 0) {
+ next += ibs.readZeta(BVGraph.DEFAULT_ZETA_K) + 1;
+ }
+ return result;
+ } catch (IOException cantHappen) {
+ throw new IllegalStateException(cantHappen);
+ }
+ }
+
+ @Override
+ public int skip(int n) {
+ if (n >= remaining) {
+ n = remaining;
+ remaining = 0;
+ return n;
+ }
+ try {
+ for (int i = n; i-- != 0;) {
+ next += ibs.readZeta(BVGraph.DEFAULT_ZETA_K) + 1;
+ }
+ remaining -= n;
+ return n;
+ } catch (IOException cantHappen) {
+ throw new IllegalStateException(cantHappen);
+ }
+ }
+
+ }
+
+ }
+
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IndexedBitArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IndexedBitArrayEdges.java
new file mode 100644
index 000000000..7254a7a99
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/IndexedBitArrayEdges.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.UnmodifiableIterator;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import org.apache.giraph.utils.Trimmable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link OutEdges} with int ids and null edge values, backed
+ * by the IndexedBitArrayEdges structure proposed in:
+ *
+ * Panagiotis Liakos, Katia Papakonstantinopoulou, Alex Delis:
+ * Realizing Memory-Optimized Distributed Graph Processing.
+ * IEEE Trans. Knowl. Data Eng. 30(4): 743-756 (2018).
+ *
+ * Note: this implementation is optimized for space usage for graphs exhibiting
+ * the locality of reference property, but edge addition and removals are
+ * expensive. Parallel edges are ignored.
+ */
+public class IndexedBitArrayEdges
+ extends ConfigurableOutEdges
+ implements ReuseObjectsOutEdges, Trimmable {
+
+ /** Serialized edges. */
+ private byte[] serializedEdges;
+ /** Number of edges. */
+ private int size;
+
+ @Override
+ public void initialize(Iterable> edges) {
+ initialize();
+ IntArrayList edgesList = new IntArrayList();
+ for (Iterator> iter = edges.iterator(); iter
+ .hasNext();) {
+ edgesList.add(iter.next().getTargetVertexId().get());
+ }
+ int[] edgesArray = Arrays.copyOfRange(edgesList.elements(), 0,
+ edgesList.size());
+ Arrays.sort(edgesArray);
+ if (edgesArray.length == 0) {
+ return;
+ }
+ int previousId = -1;
+ int currBucket = edgesArray[0] / Byte.SIZE;
+ byte myByte = (byte) 0;
+ for (int edge : edgesArray) {
+ if (edge == previousId) {
+ throw new IllegalArgumentException("Repeated successor " + previousId +
+ " in successor list of this node");
+ }
+ previousId = edge;
+ int bucket = edge / Byte.SIZE;
+ int pos = edge % Byte.SIZE;
+ if (bucket == currBucket) {
+ myByte = setBit(myByte, pos);
+ size += 1;
+ } else {
+ serializedEdges = addBytes(serializedEdges, toByteArray(currBucket),
+ myByte);
+ currBucket = bucket;
+ myByte = setBit((byte) 0, pos);
+ size += 1;
+ }
+ }
+ serializedEdges = addBytes(serializedEdges, toByteArray(currBucket),
+ myByte);
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ size = 0;
+ serializedEdges = new byte[0];
+ }
+
+ @Override
+ public void initialize() {
+ size = 0;
+ serializedEdges = new byte[0];
+ }
+
+ @Override
+ public void add(Edge edge) {
+ int bucket = edge.getTargetVertexId().get() / Byte.SIZE;
+ int pos = edge.getTargetVertexId().get() % Byte.SIZE;
+ int i = 0;
+ boolean done = false;
+ while (i < serializedEdges.length) { // if bucket is already there, simply
+ // set the appropriate bit
+ if (fromByteArray(Arrays.copyOfRange(serializedEdges, i,
+ i + Integer.SIZE / Byte.SIZE)) == bucket) {
+ int index = i + Integer.SIZE / Byte.SIZE;
+ if (!isSet(serializedEdges[index], pos)) {
+ size += 1;
+ serializedEdges[index] = setBit(serializedEdges[index], pos);
+ }
+ done = true;
+ break;
+ }
+ i += Integer.SIZE / Byte.SIZE + 1;
+ }
+ if (!done) { // we need to add a bucket
+ serializedEdges = addBytes(serializedEdges, toByteArray(bucket),
+ setBit((byte) 0, pos));
+ size += 1;
+ }
+ }
+
+ @Override
+ public void remove(IntWritable targetVertexId) {
+ int bucket = targetVertexId.get() / Byte.SIZE;
+ int pos = targetVertexId.get() % Byte.SIZE;
+ int i = 0;
+ while (i < serializedEdges.length) {
+ if (fromByteArray(Arrays.copyOfRange(serializedEdges, i,
+ i + Integer.SIZE / Byte.SIZE)) == bucket) {
+ serializedEdges[i + Integer.SIZE / Byte.SIZE] = unsetBit(
+ serializedEdges[i + Integer.SIZE / Byte.SIZE], pos);
+ --size;
+ break;
+ }
+ i += Integer.SIZE / Byte.SIZE + 1;
+ }
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public void trim() {
+ // Nothing to do
+ }
+
+ @Override
+ public Iterator> iterator() {
+ if (size == 0) {
+ return ImmutableSet.>of().iterator();
+ } else {
+ return new IndexedBitmapEdgeIterator();
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ size = in.readInt();
+ int serializedEdgesBytesUsed = in.readInt();
+ if (serializedEdgesBytesUsed > 0) {
+ // Only create a new buffer if the old one isn't big enough
+ if (serializedEdges == null ||
+ serializedEdgesBytesUsed > serializedEdges.length) {
+ serializedEdges = new byte[serializedEdgesBytesUsed];
+ }
+ in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(size);
+ out.writeInt(serializedEdges.length);
+ if (serializedEdges.length > 0) {
+ out.write(serializedEdges, 0, serializedEdges.length);
+ }
+ }
+
+ /** Iterator that reuses the same Edge object. */
+ private class IndexedBitmapEdgeIterator
+ extends UnmodifiableIterator> {
+ /** Representative edge object. */
+ private final Edge representativeEdge =
+ EdgeFactory.create(new IntWritable());
+ /** Current edge count */
+ private int currentEdge = 0;
+ /** Current bit position */
+ private int currentBitPosition = 0;
+ /** Current byte position */
+ private int currentBytePosition = 0;
+ /** Index int */
+ private int indexInt;
+ /** Current byte */
+ private Byte myByte;
+ /** Current index */
+ private byte[] myBytes = new byte[Integer.SIZE / Byte.SIZE];
+
+ @Override
+ public boolean hasNext() {
+ return currentEdge < size;
+ }
+
+ @Override
+ public Edge next() {
+ if (currentBitPosition == 8) {
+ currentBitPosition = 0;
+ }
+ if (currentBitPosition == 0) {
+ myBytes[0] = serializedEdges[currentBytePosition];
+ myBytes[1] = serializedEdges[currentBytePosition + 1];
+ myBytes[2] = serializedEdges[currentBytePosition + 2];
+ myBytes[3] = serializedEdges[currentBytePosition + 3];
+ indexInt = fromByteArray(myBytes);
+ myByte = serializedEdges[currentBytePosition +
+ Integer.SIZE / Byte.SIZE];
+ currentBytePosition += Integer.SIZE / Byte.SIZE + 1;
+ }
+ int pos = currentBitPosition;
+ int nextPos = 0;
+ boolean done = false;
+ while (!done) {
+ for (int i = pos; i < Byte.SIZE; i++) {
+ if (isSet(myByte, i)) {
+ done = true;
+ nextPos = i;
+ currentEdge++;
+ currentBitPosition = i + 1;
+ break;
+ }
+ }
+ if (!done /* && mapIterator.hasNext() */) {
+ myBytes[0] = serializedEdges[currentBytePosition];
+ myBytes[1] = serializedEdges[currentBytePosition + 1];
+ myBytes[2] = serializedEdges[currentBytePosition + 2];
+ myBytes[3] = serializedEdges[currentBytePosition + 3];
+ indexInt = fromByteArray(myBytes);
+ myByte = serializedEdges[currentBytePosition +
+ Integer.SIZE / Byte.SIZE];
+ currentBytePosition += Integer.SIZE / Byte.SIZE + 1;
+ pos = 0;
+ }
+ }
+ representativeEdge.getTargetVertexId()
+ .set(indexInt * Byte.SIZE + nextPos);
+ return representativeEdge;
+ }
+
+ }
+
+ /**
+ * tests if bit is set in a byte
+ *
+ * @param myByte the byte to be tested
+ * @param pos the position in the byte to be tested
+ * @return true or false depending on the bit being set
+ *
+ */
+ public static boolean isSet(byte myByte, int pos) {
+ return (myByte & (1 << pos)) != 0;
+ }
+
+ /**
+ * tests if bit is set in a byte
+ *
+ * @param myByte the byte to be updated
+ * @param pos the position in the byte to be set
+ * @return the updated byte
+ *
+ */
+ public static byte setBit(byte myByte, int pos) {
+ return (byte) (myByte | (1 << pos));
+ }
+
+ /**
+ * tests if bit is set in a byte
+ *
+ * @param myByte the byte to be updated
+ * @param pos the position in the byte to be unset
+ * @return the updated byte
+ *
+ */
+ public static byte unsetBit(byte myByte, int pos) {
+ return (byte) (myByte & ~(1 << pos));
+ }
+
+ /**
+ * converts an integer to a 4-byte length array holding its binary
+ * representation
+ *
+ * @param value the integer to be converted
+ * @return the 4-byte length byte array holding the integer's binary
+ * representation
+ */
+ public static byte[] toByteArray(int value) {
+ return new byte[] { (byte) (value >> 24), (byte) (value >> 16),
+ (byte) (value >> 8), (byte) value };
+ }
+
+ /**
+ * converts a 4-byte length byte array to its respective integer
+ *
+ * @param bytes a 4-byte length byte array
+ * @return the integer represented in the byte array
+ */
+ public static int fromByteArray(byte[] bytes) {
+ return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 |
+ (bytes[3] & 0xFF);
+ }
+
+ /**
+ * returns a byte array with the concatenation of the three input parameters
+ *
+ * @param original a byte array
+ * @param bindex a byte array
+ * @param myByte a byte
+ * @return a byte array with the concatenation of the three input parameters
+ */
+ public static byte[] addBytes(byte[] original, byte[] bindex, byte myByte) {
+ byte[] destination = new byte[original.length + Integer.SIZE / Byte.SIZE +
+ 1];
+ System.arraycopy(original, 0, destination, 0, original.length);
+ System.arraycopy(bindex, 0, destination, original.length,
+ Integer.SIZE / Byte.SIZE);
+ destination[destination.length - 1] = myByte;
+ return destination;
+ }
+
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/IntervalResidualEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/IntervalResidualEdges.java
new file mode 100644
index 000000000..c63f59b6c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/IntervalResidualEdges.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Implementation of {@link OutEdges} with int ids and null edge values, backed
+ * by the IntervalResidualEdges structure proposed in:
+ *
+ * Panagiotis Liakos, Katia Papakonstantinopoulou, Alex Delis:
+ * Realizing Memory-Optimized Distributed Graph Processing.
+ * IEEE Trans. Knowl. Data Eng. 30(4): 743-756 (2018).
+ *
+ * Note: this implementation is optimized for space usage for graphs exhibiting
+ * the locality of reference property, but edge addition and removals are
+ * expensive. Parallel edges are not allowed.
+ */
+public class IntervalResidualEdges
+ extends ConfigurableOutEdges
+ implements ReuseObjectsOutEdges, Trimmable {
+
+ /** Minimum interval length is equal to 2 */
+ private static final int MIN_INTERVAL_LENGTH = 2;
+
+ /** Maximum interval length is equal to 254 */
+ private static final int MAX_INTERVAL_LENGTH = 254;
+
+ /** Serialized Intervals and Residuals */
+ private byte[] intervalsAndResiduals;
+
+ /** Number of edges stored in compressed array */
+ private int size;
+
+ @Override
+ public void initialize(Iterable> edges) {
+ IntArrayList edgesList = new IntArrayList();
+ for (Iterator> iter = edges.iterator(); iter
+ .hasNext();) {
+ edgesList.add(iter.next().getTargetVertexId().get());
+ }
+ compress(Arrays.copyOfRange(edgesList.elements(), 0, edgesList.size()));
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ size = 0;
+ intervalsAndResiduals = null;
+ }
+
+ @Override
+ public void initialize() {
+ size = 0;
+ intervalsAndResiduals = null;
+ }
+
+ @Override
+ public void add(Edge edge) {
+ // Note that this is very expensive (decompresses all edges and recompresses
+ // them again).
+ IntArrayList edgesList = new IntArrayList();
+ for (Iterator> iter = this.iterator(); iter
+ .hasNext();) {
+ edgesList.add(iter.next().getTargetVertexId().get());
+ }
+ edgesList.add(edge.getTargetVertexId().get());
+ compress(Arrays.copyOfRange(edgesList.elements(), 0, edgesList.size()));
+ }
+
+ @Override
+ public void remove(IntWritable targetVertexId) {
+ // Note that this is very expensive (decompresses all edges and recompresses
+ // them again).
+ final int id = targetVertexId.get();
+ initialize(Iterables.filter(this,
+ new Predicate>() {
+ @Override
+ public boolean apply(Edge edge) {
+ return edge.getTargetVertexId().get() != id;
+ }
+ }));
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public Iterator> iterator() {
+ if (size == 0) {
+ return ImmutableSet.>of().iterator();
+ } else {
+ return new IntervalResidualEdgeIterator();
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ size = in.readInt();
+ int intervalResidualEdgesBytesUsed = in.readInt();
+ if (intervalResidualEdgesBytesUsed > 0) {
+ // Only create a new buffer if the old one isn't big enough
+ if (intervalsAndResiduals == null ||
+ intervalResidualEdgesBytesUsed > intervalsAndResiduals.length) {
+ intervalsAndResiduals = new byte[intervalResidualEdgesBytesUsed];
+ }
+ in.readFully(intervalsAndResiduals, 0, intervalResidualEdgesBytesUsed);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(size);
+ out.writeInt(intervalsAndResiduals.length);
+ if (intervalsAndResiduals.length > 0) {
+ out.write(intervalsAndResiduals, 0, intervalsAndResiduals.length);
+ }
+ }
+
+ @Override
+ public void trim() {
+ /* Nothing to do */
+ }
+
+ /**
+ * Receives an integer array of successors and compresses them in the
+ * intervalsAndResiduals byte array
+ *
+ * @param edgesArray an integer array of successors
+ */
+ private void compress(final int[] edgesArray) {
+ try {
+ ExtendedDataOutput eos = getConf().createExtendedDataOutput();
+ IntArrayList left = new IntArrayList();
+ IntArrayList len = new IntArrayList();
+ IntArrayList residuals = new IntArrayList();
+ // If we are to produce intervals, we first compute them.
+ int intervalCount;
+ try {
+ intervalCount = BVEdges.intervalize(edgesArray, MIN_INTERVAL_LENGTH,
+ MAX_INTERVAL_LENGTH, left, len, residuals);
+ } catch (IllegalArgumentException e) {
+ // array was not sorted, sorting and retrying
+ Arrays.sort(edgesArray);
+ left = new IntArrayList();
+ len = new IntArrayList();
+ residuals = new IntArrayList();
+ intervalCount = BVEdges.intervalize(edgesArray, MIN_INTERVAL_LENGTH,
+ MAX_INTERVAL_LENGTH, left, len, residuals);
+ }
+
+ // We write out the intervals.
+ eos.writeInt(intervalCount);
+ for (int i = 0; i < intervalCount; i++) {
+ eos.writeInt(left.getInt(i));
+ eos.write(len.getInt(i));
+ }
+ final int[] residual = residuals.elements();
+ final int residualCount = residuals.size();
+ for (int i = 0; i < residualCount; i++) {
+ eos.writeInt(residual[i]);
+ }
+ intervalsAndResiduals = eos.toByteArray();
+ size = edgesArray.length;
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Iterator that reuses the same Edge object.
+ */
+ private class IntervalResidualEdgeIterator
+ extends UnmodifiableIterator> {
+ /** Input for processing the bytes */
+ private ExtendedDataInput extendedDataInput = getConf()
+ .createExtendedDataInput(intervalsAndResiduals, 0,
+ intervalsAndResiduals.length);
+ /** Representative edge object. */
+ private final Edge representativeEdge =
+ EdgeFactory.create(new IntWritable());
+ /** Current edge count */
+ private int currentEdge = 0;
+ /** Current interval index */
+ private int currentLeft;
+ /** Current interval length */
+ private int currentLen = 0;
+ /** Interval counter initialized with interval size and counting down */
+ private int intervalCount;
+
+ /** Constructor */
+ public IntervalResidualEdgeIterator() {
+ try {
+ intervalCount = extendedDataInput.readInt();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentEdge < size;
+ }
+
+ @Override
+ public Edge next() {
+ this.currentEdge++;
+ switch (this.currentLen) {
+ case 0:
+ switch (this.intervalCount) {
+ case 0:
+ try {
+ representativeEdge.getTargetVertexId()
+ .set(extendedDataInput.readInt());
+ } catch (IOException canthappen) {
+ throw new IllegalStateException(canthappen);
+ }
+ return representativeEdge;
+ default:
+ try {
+ this.currentLeft = extendedDataInput.readInt();
+ this.currentLen = extendedDataInput.readByte() & 0xff;
+ intervalCount--;
+ } catch (IOException canthappen) {
+ throw new IllegalStateException(canthappen);
+ }
+ final int result = this.currentLeft;
+ this.currentLen--;
+ representativeEdge.getTargetVertexId().set(result);
+ return representativeEdge;
+
+ }
+ default:
+ final int result = ++this.currentLeft;
+ this.currentLen--;
+ representativeEdge.getTargetVertexId().set(result);
+ return representativeEdge;
+ }
+ }
+
+ }
+}
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/BVEdgesEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/BVEdgesEdgesTest.java
new file mode 100644
index 000000000..dfb752ef1
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/BVEdgesEdgesTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Implementation of {@link OutEdges} with int ids and null edge values, backed
+ * by the BVEdges structure proposed in "Panagiotis Liakos, Katia
+ * Papakonstantinopoulou, Alex Delis: Realizing Memory-Optimized Distributed
+ * Graph Processing. IEEE Trans. Knowl. Data Eng. 30(4): 743-756 (2018)". Note:
+ * this implementation is optimized for space usage, but edge addition and
+ * removals are expensive. Parallel edges are not allowed.
+ */
+public class BVEdgesEdgesTest {
+ private static Edge createEdge(int id) {
+ return EdgeFactory.create(new IntWritable(id));
+ }
+
+ private static void assertEdges(BVEdges edges, int... expected) {
+ int index = 0;
+ for (Edge edge : (Iterable>) edges) {
+ Assert.assertEquals(expected[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ Assert.assertEquals(expected.length, index);
+ }
+
+ private BVEdges getEdges() {
+ GiraphConfiguration gc = new GiraphConfiguration();
+ GiraphConstants.VERTEX_ID_CLASS.set(gc, IntWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(gc, NullWritable.class);
+ ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration(
+ gc);
+ BVEdges ret = new BVEdges();
+ ret.setConf(
+ new ImmutableClassesGiraphConfiguration(
+ conf));
+ return ret;
+ }
+
+ @Test
+ public void testEdges() {
+ BVEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4); // order matters, it's an array
+
+ edges.remove(new IntWritable(2));
+ assertEdges(edges, 1, 3, 4);
+ }
+
+ @Test
+ public void testInitialize() {
+ BVEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testInitializeUnsorted() {
+ BVEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(3), createEdge(4), createEdge(1));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 3, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(2)));
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testMutateEdges() {
+ BVEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, edges.size());
+ // The edge ids should be all odd
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ }
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ BVEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEdges(edges, 1, 3, 5, 7, 9); // id order matter because of the
+ // implementation
+
+ ByteArrayOutputStream arrayStream = new ByteArrayOutputStream();
+ DataOutputStream tempBuffer = new DataOutputStream(arrayStream);
+
+ edges.write(tempBuffer);
+ tempBuffer.close();
+
+ byte[] binary = arrayStream.toByteArray();
+
+ assertTrue("Serialized version should not be empty ", binary.length > 0);
+
+ edges = getEdges();
+ edges.readFields(new DataInputStream(new ByteArrayInputStream(binary)));
+
+ assertEquals(5, edges.size());
+
+ int[] ids = new int[] { 1, 3, 5, 7, 9 };
+ int index = 0;
+
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(ids[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ assertEquals(ids.length, index);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParallelEdges() {
+ BVEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(2), createEdge(2), createEdge(2));
+
+ edges.initialize(initialEdges);
+ }
+
+ @Test
+ public void testEdgeValues() {
+ BVEdges edges = getEdges();
+ Set testValues = new HashSet();
+ testValues.add(0);
+ testValues.add(Integer.MAX_VALUE / 2);
+ testValues.add(Integer.MAX_VALUE);
+
+ List> initialEdges = new ArrayList>();
+ for (Integer id : testValues) {
+ initialEdges.add(createEdge(id));
+ }
+
+ edges.initialize(initialEdges);
+
+ Iterator> edgeIt = edges.iterator();
+ while (edgeIt.hasNext()) {
+ int value = edgeIt.next().getTargetVertexId().get();
+ assertTrue("Unknown edge found " + value, testValues.remove(value));
+ }
+ }
+
+ @Test
+ public void testAddedSmallerValues() {
+ BVEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(100));
+
+ edges.initialize(initialEdges);
+
+ for (int i = 0; i < 16; i++) {
+ edges.add(createEdge(i));
+ }
+
+ assertEquals(17, edges.size());
+ }
+
+ @Test
+ public void testCompressedSize() throws IOException {
+ BVEdges edges = getEdges();
+
+ // Add 10 edges with id i, for i = 0..9
+ // to create one large interval
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Add a residual edge
+ edges.add(createEdge(23));
+
+ // Add 10 edges with id i, for i = 50..59
+ // to create a second large interval
+ for (int i = 50; i < 60; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ assertEquals(21, edges.size());
+
+ DataOutput dataOutput = Mockito.mock(DataOutput.class);
+ edges.write(dataOutput);
+
+ // size of intervalResiduals byte array should be equal to 12
+ ArgumentCaptor intervalsAndResidualsCaptop = ArgumentCaptor
+ .forClass(byte[].class);
+ Mockito.verify(dataOutput).write(intervalsAndResidualsCaptop.capture(),
+ Mockito.anyInt(), Mockito.anyInt());
+ assertEquals(12, intervalsAndResidualsCaptop.getValue().length);
+ }
+
+}
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/IndexedBitArrayEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/IndexedBitArrayEdgesTest.java
new file mode 100644
index 000000000..ba5b47a31
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/IndexedBitArrayEdgesTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IndexedBitArrayEdgesTest {
+ private static Edge createEdge(int id) {
+ return EdgeFactory.create(new IntWritable(id));
+ }
+
+ private static void assertEdges(IndexedBitArrayEdges edges, int... expected) {
+ int index = 0;
+ for (Edge edge : (Iterable>) edges) {
+ Assert.assertEquals(expected[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ Assert.assertEquals(expected.length, index);
+ }
+
+ private IndexedBitArrayEdges getEdges() {
+ GiraphConfiguration gc = new GiraphConfiguration();
+ GiraphConstants.VERTEX_ID_CLASS.set(gc, IntWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(gc, NullWritable.class);
+ ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration(
+ gc);
+ IndexedBitArrayEdges ret = new IndexedBitArrayEdges();
+ ret.setConf(
+ new ImmutableClassesGiraphConfiguration(
+ conf));
+ return ret;
+ }
+
+ @Test
+ public void testEdges() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4); // order matters, it's an array
+
+ edges.remove(new IntWritable(2));
+ assertEdges(edges, 1, 3, 4);
+ }
+
+ @Test
+ public void testInitialize() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testInitializeUnsorted() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(3), createEdge(4), createEdge(1));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 3, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(2)));
+
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testMutateEdges() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, edges.size());
+ // The edge ids should be all odd
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ }
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ IndexedBitArrayEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEdges(edges, 1, 3, 5, 7, 9); // id order matter because of the
+ // implementation
+
+ ByteArrayOutputStream arrayStream = new ByteArrayOutputStream();
+ DataOutputStream tempBuffer = new DataOutputStream(arrayStream);
+
+ edges.write(tempBuffer);
+ tempBuffer.close();
+
+ byte[] binary = arrayStream.toByteArray();
+
+ assertTrue("Serialized version should not be empty ", binary.length > 0);
+
+ edges = getEdges();
+ edges.readFields(new DataInputStream(new ByteArrayInputStream(binary)));
+
+ assertEquals(5, edges.size());
+
+ int[] ids = new int[] { 1, 3, 5, 7, 9 };
+ int index = 0;
+
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(ids[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ assertEquals(ids.length, index);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParallelEdges() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(2), createEdge(2), createEdge(2));
+
+ edges.initialize(initialEdges);
+ }
+
+ @Test
+ public void testEdgeValues() {
+ IndexedBitArrayEdges edges = getEdges();
+ Set testValues = new HashSet();
+ testValues.add(0);
+ testValues.add(Integer.MAX_VALUE / 2);
+ testValues.add(Integer.MAX_VALUE);
+
+ List> initialEdges = new ArrayList>();
+ for (Integer id : testValues) {
+ initialEdges.add(createEdge(id));
+ }
+
+ edges.initialize(initialEdges);
+
+ Iterator> edgeIt = edges.iterator();
+ while (edgeIt.hasNext()) {
+ int value = edgeIt.next().getTargetVertexId().get();
+ assertTrue("Unknown edge found " + value, testValues.remove(value));
+ }
+ }
+
+ @Test
+ public void testAddedSmallerValues() {
+ IndexedBitArrayEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(100));
+
+ edges.initialize(initialEdges);
+
+ for (int i = 0; i < 16; i++) {
+ edges.add(createEdge(i));
+ }
+
+ assertEquals(17, edges.size());
+ }
+
+ @Test
+ public void testCompressedSize() throws IOException {
+ IndexedBitArrayEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ // to create on large interval
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Add a residual edge
+ edges.add(createEdge(23));
+
+ assertEquals(11, edges.size());
+
+ DataOutput dataOutput = Mockito.mock(DataOutput.class);
+ edges.write(dataOutput);
+
+ // size of serializedEdges byte array should be equal to 15:
+ // 5 (bucket 0: 0-7) + 5 (bucket 1: 8-10) + 5 (bucket 2: 23)
+ ArgumentCaptor serializedEdgesCaptop = ArgumentCaptor
+ .forClass(byte[].class);
+ Mockito.verify(dataOutput).write(serializedEdgesCaptop.capture(),
+ Mockito.anyInt(), Mockito.anyInt());
+ assertEquals(15, serializedEdgesCaptop.getValue().length);
+ }
+}
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/IntervalResidualEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/IntervalResidualEdgesTest.java
new file mode 100644
index 000000000..e9f17ee57
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/IntervalResidualEdgesTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IntervalResidualEdgesTest {
+ private static Edge createEdge(int id) {
+ return EdgeFactory.create(new IntWritable(id));
+ }
+
+ private static void assertEdges(IntervalResidualEdges edges,
+ int... expected) {
+ int index = 0;
+ for (Edge edge : (Iterable>) edges) {
+ Assert.assertEquals(expected[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ Assert.assertEquals(expected.length, index);
+ }
+
+ private IntervalResidualEdges getEdges() {
+ GiraphConfiguration gc = new GiraphConfiguration();
+ GiraphConstants.VERTEX_ID_CLASS.set(gc, IntWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(gc, NullWritable.class);
+ ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration(
+ gc);
+ IntervalResidualEdges ret = new IntervalResidualEdges();
+ ret.setConf(
+ new ImmutableClassesGiraphConfiguration(
+ conf));
+ return ret;
+ }
+
+ @Test
+ public void testEdges() {
+ IntervalResidualEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4); // order matters, it's an array
+
+ edges.remove(new IntWritable(2));
+ assertEdges(edges, 3, 4, 1);
+ }
+
+ @Test
+ public void testInitialize() {
+ IntervalResidualEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(1), createEdge(2), createEdge(4));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 1, 2, 4);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(3)));
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testInitializeUnsorted() {
+ IntervalResidualEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(3), createEdge(4), createEdge(1));
+
+ edges.initialize(initialEdges);
+ assertEdges(edges, 3, 4, 1);
+
+ edges.add(EdgeFactory.createReusable(new IntWritable(2)));
+ assertEdges(edges, 1, 2, 3, 4);
+
+ edges.initialize();
+ assertEquals(0, edges.size());
+ }
+
+ @Test
+ public void testMutateEdges() {
+ IntervalResidualEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEquals(5, edges.size());
+ // The edge ids should be all odd
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(1, edge.getTargetVertexId().get() % 2);
+ }
+ }
+
+ @Test
+ public void testSerialization() throws IOException {
+ IntervalResidualEdges edges = getEdges();
+
+ edges.initialize();
+
+ // Add 10 edges with id i, for i = 0..9
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Remove edges with even id
+ for (int i = 0; i < 10; i += 2) {
+ edges.remove(new IntWritable(i));
+ }
+
+ // We should now have 5 edges
+ assertEdges(edges, 1, 3, 5, 7, 9); // id order matter because of the
+ // implementation
+
+ ByteArrayOutputStream arrayStream = new ByteArrayOutputStream();
+ DataOutputStream tempBuffer = new DataOutputStream(arrayStream);
+
+ edges.write(tempBuffer);
+ tempBuffer.close();
+
+ byte[] binary = arrayStream.toByteArray();
+
+ assertTrue("Serialized version should not be empty ", binary.length > 0);
+
+ edges = getEdges();
+ edges.readFields(new DataInputStream(new ByteArrayInputStream(binary)));
+
+ assertEquals(5, edges.size());
+
+ int[] ids = new int[] { 1, 3, 5, 7, 9 };
+ int index = 0;
+
+ for (Edge edge : (Iterable>) edges) {
+ assertEquals(ids[index], edge.getTargetVertexId().get());
+ index++;
+ }
+ assertEquals(ids.length, index);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParallelEdges() {
+ IntervalResidualEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(2), createEdge(2), createEdge(2));
+
+ edges.initialize(initialEdges);
+ }
+
+ @Test
+ public void testEdgeValues() {
+ IntervalResidualEdges edges = getEdges();
+ Set testValues = new HashSet();
+ testValues.add(0);
+ testValues.add(Integer.MAX_VALUE / 2);
+ testValues.add(Integer.MAX_VALUE);
+
+ List> initialEdges = new ArrayList>();
+ for (Integer id : testValues) {
+ initialEdges.add(createEdge(id));
+ }
+
+ edges.initialize(initialEdges);
+
+ Iterator> edgeIt = edges.iterator();
+ while (edgeIt.hasNext()) {
+ int value = edgeIt.next().getTargetVertexId().get();
+ assertTrue("Unknown edge found " + value, testValues.remove(value));
+ }
+ }
+
+ @Test
+ public void testAddedSmallerValues() {
+ IntervalResidualEdges edges = getEdges();
+
+ List> initialEdges = Lists
+ .newArrayList(createEdge(100));
+
+ edges.initialize(initialEdges);
+
+ for (int i = 0; i < 16; i++) {
+ edges.add(createEdge(i));
+ }
+
+ assertEquals(17, edges.size());
+ }
+
+ @Test
+ public void testCompressedSize() throws IOException {
+ IntervalResidualEdges edges = getEdges();
+
+ // Add 10 edges with id i, for i = 0..9
+ // to create on large interval
+ for (int i = 0; i < 10; ++i) {
+ edges.add(createEdge(i));
+ }
+
+ // Add a residual edge
+ edges.add(createEdge(23));
+
+ assertEquals(11, edges.size());
+
+ DataOutput dataOutput = Mockito.mock(DataOutput.class);
+ edges.write(dataOutput);
+
+ // size of intervalResiduals byte array should be equal to 13:
+ // 4 (intervals' size) + 4 (index of 1st interval) + 1 (length of 1st
+ // interval)
+ // + 4 (residual)
+ ArgumentCaptor intervalsAndResidualsCaptop = ArgumentCaptor
+ .forClass(byte[].class);
+ Mockito.verify(dataOutput).write(intervalsAndResidualsCaptop.capture(),
+ Mockito.anyInt(), Mockito.anyInt());
+ assertEquals(13, intervalsAndResidualsCaptop.getValue().length);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 1522fd4d0..28d1cfcd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -328,9 +328,10 @@ under the License.
2.1
3.1
3.4
+ 2.4.2
0.14.0
2.1.2
- 6.5.4
+ 8.1.0
2.0.2
18.0
0.94.16
@@ -358,6 +359,7 @@ under the License.
1.7.6
2.4.0
0.2.1
+ 3.6.1
2.2.0
11.0.10
3.4.5
@@ -1781,11 +1783,85 @@ under the License.
base64
${dep.base64.version}