From f4464faecad7c69aed09261d516764154a13811e Mon Sep 17 00:00:00 2001 From: Josep Rubio Date: Thu, 28 Jan 2016 18:12:17 -0600 Subject: [PATCH 1/3] Final --- okapi.iml | 93 +++++ .../clustering/ap/APEdgeInputFormatter.java | 87 ++++ .../grafos/okapi/clustering/ap/APMessage.java | 59 +++ .../okapi/clustering/ap/APOutputFormat.java | 89 +++++ .../okapi/clustering/ap/APVertexID.java | 94 +++++ .../clustering/ap/APVertexInputFormatter.java | 84 ++++ .../okapi/clustering/ap/APVertexType.java | 23 ++ .../okapi/clustering/ap/APVertexValue.java | 83 ++++ .../clustering/ap/AffinityPropagation.java | 378 ++++++++++++++++++ .../clustering/ap/ExemplarAggregator.java | 37 ++ .../clustering/ap/MasterComputation.java | 43 ++ .../ap/AffinityPropagationStockTest.java | 96 +++++ .../ap/AffinityPropagationTest.java | 156 ++++++++ .../grafos/okapi/clustering/ap/StockTest.java | 228 +++++++++++ 14 files changed, 1550 insertions(+) create mode 100644 okapi.iml create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APEdgeInputFormatter.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APMessage.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APOutputFormat.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APVertexID.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APVertexInputFormatter.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APVertexType.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/APVertexValue.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/AffinityPropagation.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/ExemplarAggregator.java create mode 100644 src/main/java/ml/grafos/okapi/clustering/ap/MasterComputation.java create mode 100644 src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java create mode 100644 src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationTest.java create mode 100644 src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java diff --git a/okapi.iml b/okapi.iml new file mode 100644 index 00000000..e843457c --- /dev/null +++ b/okapi.iml @@ -0,0 +1,93 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APEdgeInputFormatter.java b/src/main/java/ml/grafos/okapi/clustering/ap/APEdgeInputFormatter.java new file mode 100644 index 00000000..8e9e400b --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APEdgeInputFormatter.java @@ -0,0 +1,87 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.formats.TextEdgeInputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * Edge input formatter for Affinity Propagation problems. + *

+ * The input format consists of an entry for each pair of points. The first + * element of the entry denotes the id of the first point. Similarly, the + * second element denotes the id of the second point. Finally, the third + * element contains a double value encoding the similarity between the first + * and second points. + *

+ * Example:
+ * 1 1 1 + * 1 2 1 + * 1 3 5 + * 2 1 1 + * 2 2 1 + * 2 3 3 + * 3 1 5 + * 3 2 3 + * 3 3 1 + *

+ * Encodes a problem in which data point "1" has similarity 1 with itself, + * 1 with point "2" and 5 with point "3". In a similar manner, points "2", + * and "3" have similarities of [1, 1, 3] and [5, 3, 1] respectively with + * points "1", "2", and "3". + * + * @author Josep Rubió Piqué + */ +public class APEdgeInputFormatter extends TextEdgeInputFormat { + + private static final Pattern SEPARATOR = Pattern.compile("[\001\t ]"); + + @Override + public EdgeReader createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new APEdgeInputReader(); + } + + public class APEdgeInputReader extends TextEdgeReaderFromEachLineProcessed { + + @Override + protected String[] preprocessLine(Text line) throws IOException { + return SEPARATOR.split(line.toString()); + } + + @Override + protected APVertexID getTargetVertexId(String[] line) throws IOException { + return new APVertexID(APVertexType.E, Long.valueOf(line[1])); + } + + @Override + protected APVertexID getSourceVertexId(String[] line) throws IOException { + return new APVertexID(APVertexType.I, Long.valueOf(line[0])); + } + + @Override + protected DoubleWritable getValue(String[] line) throws IOException { + return new DoubleWritable(Double.valueOf(line[2]) + + getConf().getFloat(AffinityPropagation.NOISE, AffinityPropagation.NOISE_DEFAULT)); + } + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APMessage.java b/src/main/java/ml/grafos/okapi/clustering/ap/APMessage.java new file mode 100644 index 00000000..f8e77bea --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APMessage.java @@ -0,0 +1,59 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This class defines the messages exchanged between Affinity propagation vertices. + * + * @author Josep Rubió Piqué +*/ +public class APMessage implements Writable { + + public APVertexID from; + public double value; + + public APMessage() { + from = new APVertexID(); + } + + public APMessage(APVertexID from, double value) { + this.from = from; + this.value = value; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + from.write(dataOutput); + dataOutput.writeDouble(value); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + from.readFields(dataInput); + value = dataInput.readDouble(); + } + + @Override + public String toString() { + return "APMessage{from=" + from + ", value=" + value + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APOutputFormat.java b/src/main/java/ml/grafos/okapi/clustering/ap/APOutputFormat.java new file mode 100644 index 00000000..b9445cd5 --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APOutputFormat.java @@ -0,0 +1,89 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Output Formatter for Affinity Propagation problems. + *

+ * The output format consists of an entry for each of the data points to cluster. + * The first element of the entry is a integer value encoding the data point + * index (id), whereas the second value encodes the exemplar id chosen for + * that point. + *

+ * Example:
+ * 1 3 + * 2 3 + * 3 3 + *

+ * Encodes a solution in which data points "1", "2", and "3" choose point "3" + * as an exemplar. + * + * @author Josep Rubió Piqué + */ +@SuppressWarnings("rawtypes") +public class APOutputFormat + extends IdWithValueTextOutputFormat { + + /** + * Specify the output delimiter + */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** + * Default output delimiter + */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + + @Override + public TextVertexWriter createVertexWriter(TaskAttemptContext context) { + return new IdWithValueVertexWriter(); + } + + protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine { + /** + * Saved delimiter + */ + private String delimiter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + super.initialize(context); + delimiter = getConf().get( + LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + @Override + protected Text convertVertexToLine(Vertex vertex) + throws IOException { + + if (vertex.getId().type != APVertexType.I) { + return null; + } + + return new Text(String.valueOf(vertex.getId().index) + + delimiter + Long.toString(vertex.getValue().exemplar.get())); + } + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APVertexID.java b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexID.java new file mode 100644 index 00000000..220068c2 --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexID.java @@ -0,0 +1,94 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import com.google.common.collect.ComparisonChain; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** +* Id for Affinity propagation vertices. +* +* @author Josep Rubió Piqué +*/ +public class APVertexID implements WritableComparable { + + /** + * Identifies whether the vertex represents a or or a column factor. + */ + public APVertexType type = APVertexType.I; + /** + * Index of the point to be clustered. + */ + public long index = 0; + + public APVertexID() { + } + + public APVertexID(APVertexID orig) { + this(orig.type, orig.index); + } + + public APVertexID(APVertexType type, long index) { + this.type = type; + this.index = index; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(type.ordinal()); + dataOutput.writeLong(index); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + final int typeIndex = dataInput.readInt(); + type = APVertexType.values()[typeIndex]; + this.index = dataInput.readLong(); + } + + @Override + public int compareTo(APVertexID that) { + return ComparisonChain.start() + .compare(this.type, that.type) + .compare(this.index, that.index) + .result(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + APVertexID that = (APVertexID) o; + return index == that.index && type == that.type; + } + + @Override + public int hashCode() { + int result = (int) (index ^ (index >>> 32)); + result = 31 * result + type.ordinal(); + return result; + } + + @Override + public String toString() { + return "(" + type + ", " + index + ")"; + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APVertexInputFormatter.java b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexInputFormatter.java new file mode 100644 index 00000000..1c833dae --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexInputFormatter.java @@ -0,0 +1,84 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import org.apache.giraph.io.formats.TextVertexValueInputFormat; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.jblas.util.Random; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * Vertex input formatter for Affinity Propagation problems. + *

+ * The input format consists of an entry for each of the data points to cluster. + * The first element of the entry is an integer value encoding the data point + * index (id). Subsequent elements in the entry are double values encoding the + * similarities between the data point of the current entry and the rest of + * data points in the problem. + *

+ * Example:
+ * 1 1 1 5 + * 2 1 1 3 + * 3 5 3 1 + *

+ * Encodes a problem in which data point "1" has similarity 1 with itself, + * 1 with point "2" and 5 with point "3". In a similar manner, points "2", + * and "3" have similarities of [1, 1, 3] and [5, 3, 1] respectively with + * points "1", "2", and "3". + * + * @author Josep Rubió Piqué + */ +public class APVertexInputFormatter + extends TextVertexValueInputFormat { + + private static final Pattern SEPARATOR = Pattern.compile("[\001\t ]"); + + @Override + public TextVertexValueReader createVertexValueReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new APInputReader(); + } + + public class APInputReader extends TextVertexValueReaderFromEachLineProcessed { + + @Override + protected String[] preprocessLine(Text line) throws IOException { + return SEPARATOR.split(line.toString()); + } + + @Override + protected APVertexID getId(String[] line) throws IOException { + return new APVertexID(APVertexType.I, + Long.valueOf(line[0])); + } + + @Override + protected APVertexValue getValue(String[] line) throws IOException { + APVertexValue value = new APVertexValue(); + for (int i = 1; i < line.length; i++) { + APVertexID neighId = new APVertexID(APVertexType.E, i); + value.weights.put(neighId, new DoubleWritable(Double.valueOf(line[i]) + // noise + + Random.nextDouble()*getConf().getFloat(AffinityPropagation.NOISE, AffinityPropagation.NOISE_DEFAULT))); + } + return value; + } + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APVertexType.java b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexType.java new file mode 100644 index 00000000..67ab606b --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexType.java @@ -0,0 +1,23 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +/** +* @author Josep Rubió Piqué +*/ +public enum APVertexType { + E, I +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/APVertexValue.java b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexValue.java new file mode 100644 index 00000000..f1df12f3 --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/APVertexValue.java @@ -0,0 +1,83 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import ml.grafos.okapi.common.data.MapWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** +* Value stored in Affinity Propagation vertices. +* +* @author Josep Rubió Piqué +*/ +public class APVertexValue implements Writable { + /** + * Id of the exemplar chosen by the vertex (if row vertex). + */ + public LongWritable exemplar; + /** + * Similarity between this point and its neighbors (if row vertex.) + */ + public MapWritable weights; + /** + * Last messages sent from this vertex to its neighbors. + */ + public MapWritable lastSentMessages; + + /** + * Last messages this vertex received. + */ + public MapWritable lastReceivedMessages; + + /** + * Has converged. + */ + public BooleanWritable converged; + + /** + * Exemplar. + */ + public BooleanWritable exemplarCalc; + + public APVertexValue() { + exemplar = new LongWritable(); + weights = new MapWritable(); + lastSentMessages = new MapWritable(); + lastReceivedMessages = new MapWritable(); + converged = new BooleanWritable(); + exemplarCalc = new BooleanWritable(); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + exemplar.write(dataOutput); + weights.write(dataOutput); + lastSentMessages.write(dataOutput); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + exemplar.readFields(dataInput); + weights.readFields(dataInput); + lastSentMessages.readFields(dataInput); + } +} \ No newline at end of file diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/AffinityPropagation.java b/src/main/java/ml/grafos/okapi/clustering/ap/AffinityPropagation.java new file mode 100644 index 00000000..9ffa14c5 --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/AffinityPropagation.java @@ -0,0 +1,378 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import ml.grafos.okapi.common.data.LongArrayListWritable; +import ml.grafos.okapi.common.data.MapWritable; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; + +/** + * Affinity Propagation is a clustering algorithm. + * + *

The number of clusters is not received as an input, but computed by the + * algorithm according to the distances between points and the preference + * of each node to be an exemplar (the "leader" of a cluster). + * + *

You can find a detailed description of the algorithm in the affinity propagation + * website. + * + * @author Josep Rubió Piqué + */ +public class AffinityPropagation + extends BasicComputation { + + private static Logger logger = LoggerFactory.getLogger(AffinityPropagation.class); + + /** + * Maximum number of iterations. + */ + public static final String MAX_ITERATIONS = "affinity.iterations"; + public static int MAX_ITERATIONS_DEFAULT = 15; + /** + * Damping factor. + */ + public static final String DAMPING = "affinity.damping"; + public static float DAMPING_DEFAULT = 0.9f; + /** + * Noise factor. + */ + public static final String NOISE = "affinity.noise"; + public static float NOISE_DEFAULT = 0f; + /** + * Epsilon factor. Do not send message to a neighbor if the new message has not changed more than epsilon. + */ + public static final String EPSILON = "affinity.epsilon"; + public static float EPSILON_DEFAULT = 0.0001f; + + @Override + public void compute(Vertex vertex, + Iterable messages) throws IOException { + logger.trace("vertex {}, superstep {}", vertex.getId(), getSuperstep()); + + final int maxIter = getContext().getConfiguration().getInt(MAX_ITERATIONS, MAX_ITERATIONS_DEFAULT); + + // Phases of the algorithm + + for (APMessage message : messages) { + vertex.getValue().lastReceivedMessages.put(new APVertexID(message.from), new DoubleWritable(message.value)); + } + + if(this.getAggregatedValue("converged").get() == getTotalNumVertices()){ + if(!vertex.getValue().exemplarCalc.get()) { + computeExemplars(vertex); + vertex.getValue().exemplarCalc.set(true); + }else{ + computeClusters(vertex); + } + }else{ + if (getSuperstep() == 0) { + initRows(vertex); + } else if (getSuperstep() == 1) { + initColumns(vertex, messages); + } else if (getSuperstep() < maxIter) { + computeBMSIteration(vertex); + } else if (getSuperstep() == maxIter) { + computeExemplars(vertex); + } else { + computeClusters(vertex); + } + } + } + + private void initRows(Vertex vertex) throws IOException { + final boolean isVertexFormat = getConf().getVertexInputFormatClass() != null; + if (isVertexFormat) { + initRowsFromVertexInput(vertex); + } else { + initRowsFromEdgeInput(vertex); + } + } + + private void initRowsFromVertexInput(Vertex vertex) { + final long nVertices = getTotalNumVertices(); + for (int i = 1; i <= nVertices; i++) { + APVertexID neighbor = new APVertexID(APVertexType.E, i); + vertex.getValue().lastSentMessages.put(neighbor, new DoubleWritable(0)); + vertex.getValue().lastReceivedMessages.put(neighbor, new DoubleWritable(0)); + sendMessage(neighbor, new APMessage(vertex.getId(), 0)); + logger.trace("Init rows: {} -> {} : {}", vertex.getId(), neighbor, 0); + } + } + + private void initRowsFromEdgeInput(Vertex vertex) throws IOException { + for (Edge edge : vertex.getEdges()) { + APVertexID neighbor = new APVertexID(edge.getTargetVertexId()); + DoubleWritable weight = new DoubleWritable(edge.getValue().get()); + vertex.getValue().weights.put(neighbor, weight); + vertex.getValue().lastSentMessages.put(neighbor, new DoubleWritable(0)); + vertex.getValue().lastReceivedMessages.put(neighbor, new DoubleWritable(0)); + sendMessage(neighbor, new APMessage(vertex.getId(), 0)); + logger.trace("Init rows:{} -> {} : {}", vertex.getId(), neighbor, 0); + vertex.removeEdges(neighbor); + } + } + + private void initColumns(Vertex vertex, Iterable messages) { + if (vertex.getId().type == APVertexType.I) { + final long nVertices = getTotalNumVertices(); + for (int i = 1; i <= nVertices; i++) { + APVertexID neighbor = new APVertexID(APVertexType.E, i); + vertex.getValue().lastSentMessages.put(neighbor, new DoubleWritable(0)); + vertex.getValue().lastReceivedMessages.put(neighbor, new DoubleWritable(0)); + sendMessage(neighbor, new APMessage(vertex.getId(), 0)); + logger.trace("Init columns:{} -> {} : {}", vertex.getId(), neighbor, 0); + } + } + + for (APMessage message : messages) { + APVertexID neighbor = new APVertexID(message.from); + vertex.getValue().lastSentMessages.put(neighbor, new DoubleWritable(0)); + sendMessage(neighbor, new APMessage(vertex.getId(), 0)); + logger.debug("Init columns:{} -> {} : {}", vertex.getId(), neighbor, 0); + } + } + + private void computeBMSIteration(Vertex vertex) throws IOException { + final APVertexID id = vertex.getId(); + + switch (id.type) { + + case E: + computeEMessages(vertex); + break; + + case I: + computeIMessages(vertex); + break; + + default: + throw new IllegalStateException("Unrecognized node type " + id.type); + } + + } + + private void computeEMessages(Vertex vertex) { + + double sum = 0; + double exemplarMessage = 0; + + MessageRelayer sender = new MessageRelayer(vertex.getValue().lastSentMessages); + + for (Map.Entry entry : vertex.getValue().lastReceivedMessages.entrySet()){ + APVertexID key = (APVertexID) entry.getKey(); + double messageValue = ((DoubleWritable) entry.getValue()).get(); + + if (key.index != vertex.getId().index) { + sum += Math.max(0, messageValue); + } else { + exemplarMessage = messageValue; + } + + } + + for (Map.Entry entry : vertex.getValue().lastReceivedMessages.entrySet()){ + APVertexID key = (APVertexID) entry.getKey(); + double messageValue = ((DoubleWritable) entry.getValue()).get(); + double value; + + if (key.index == vertex.getId().index) { + value = sum; + } else { + double a = exemplarMessage + sum - Math.max(messageValue, 0); + value = Math.min(0, a); + } + + sender.send(value,vertex.getId(),key); + + } + + if (sender.isConverged()) { + logger.debug("E vertex: {} converged.", vertex.getId()); + if(!vertex.getValue().converged.get()){ + vertex.getValue().converged.set(true); + aggregate("converged", new LongWritable(1)); + } + }else{ + if(vertex.getValue().converged.get()) { + vertex.getValue().converged.set(false); + aggregate("converged", new LongWritable(-1)); + } + } + + } + + private void computeIMessages(Vertex vertex) { + + double bestValue = Double.NEGATIVE_INFINITY, secondBestValue = Double.NEGATIVE_INFINITY; + APVertexID bestNeighbour = null; + + MessageRelayer sender = new MessageRelayer(vertex.getValue().lastSentMessages); + + MapWritable values = vertex.getValue().weights; + + for (Map.Entry entry : vertex.getValue().lastReceivedMessages.entrySet()){ + APVertexID key = (APVertexID) entry.getKey(); + double messageValue = ((DoubleWritable) entry.getValue()).get(); + + if (messageValue + ((DoubleWritable) values.get(key)).get() > bestValue) { + secondBestValue = bestValue; + bestValue = messageValue + ((DoubleWritable) values.get(key)).get(); + bestNeighbour = new APVertexID(key); + } + else if (messageValue + ((DoubleWritable) values.get(key)).get() > secondBestValue) { + secondBestValue = messageValue + ((DoubleWritable) values.get(key)).get(); + } + } + + for (Map.Entry entry : vertex.getValue().lastReceivedMessages.entrySet()){ + APVertexID key = (APVertexID) entry.getKey(); + double value; + + if (key.equals(bestNeighbour)) { + value = -secondBestValue; + } else { + value = -bestValue; + } + + value = value + ((DoubleWritable) values.get(key)).get(); + sender.send(value,vertex.getId(),key); + + } + + if (sender.isConverged()) { + logger.debug("I vertex: {} converged.", vertex.getId()); + if(!vertex.getValue().converged.get()){ + vertex.getValue().converged.set(true); + aggregate("converged", new LongWritable(1)); + } + }else{ + if(vertex.getValue().converged.get()) { + vertex.getValue().converged.set(false); + aggregate("converged", new LongWritable(-1)); + } + } + + } + + private void computeExemplars(Vertex vertex) throws IOException { + final APVertexID id = vertex.getId(); + // Exemplars are auto-elected among variables + if (id.type != APVertexType.E) { + return; + } + + // But only by those variables on the diagonal of the matrix + for (Map.Entry entry : vertex.getValue().lastReceivedMessages.entrySet()){ + APVertexID key = (APVertexID) entry.getKey(); + double messageValue = ((DoubleWritable) entry.getValue()).get(); + + if (key.index == id.index) { + double lastMessageValue = ((DoubleWritable) vertex.getValue().lastSentMessages.get(key)).get(); + double belief = messageValue + lastMessageValue; + if (belief >= 0) { + LongArrayListWritable exemplars = new LongArrayListWritable(); + exemplars.add(new LongWritable(id.index)); + aggregate("exemplars", exemplars); + logger.debug("Point {} decides to become an exemplar with value {}.", id.index, belief); + } else { + logger.debug("Point {} does not want to be an exemplar with value {}.", id.index, belief); + } + + } + } + } + + private void computeClusters(Vertex vertex) throws IOException { + APVertexID id = vertex.getId(); + if (id.type != APVertexType.I) { + vertex.voteToHalt(); + return; + } + + final LongArrayListWritable exemplars = getAggregatedValue("exemplars"); + if (exemplars.contains(new LongWritable(id.index))) { + logger.debug("Point {} is an exemplar.", id.index); + vertex.getValue().exemplar = new LongWritable(id.index); + vertex.voteToHalt(); + return; + } + + long bestExemplar = -1; + double maxValue = Double.NEGATIVE_INFINITY; + MapWritable values = vertex.getValue().weights; + for (LongWritable exemplarWritable : exemplars) { + final long exemplar = exemplarWritable.get(); + final APVertexID neighbor = new APVertexID(APVertexType.E, exemplar); + if (!values.containsKey(neighbor)) { + continue; + } + + final double value = ((DoubleWritable) values.get(neighbor)).get(); + if (value > maxValue) { + maxValue = value; + bestExemplar = exemplar; + } + } + + logger.debug("Point {} decides to follow {}.", id.index, bestExemplar); + vertex.getValue().exemplar = new LongWritable(bestExemplar); + vertex.voteToHalt(); + } + + public class MessageRelayer{ + private MapWritable lastMessages; + final float damping = getContext().getConfiguration().getFloat(DAMPING, DAMPING_DEFAULT); + final float epsilon = getContext().getConfiguration().getFloat(EPSILON, EPSILON_DEFAULT); + int nMessagesSent = 0; + + public MessageRelayer(MapWritable lastMessages) { + this.lastMessages = lastMessages; + } + + public void send(double value, APVertexID sender, APVertexID recipient) { + if (lastMessages.containsKey(recipient)) { + final double lastMessage = ((DoubleWritable) lastMessages.get(recipient)).get(); + if (Math.abs(lastMessage - value) < epsilon) { + return; + } + + value = damping * lastMessage + (1 - damping) * value; + } + logger.trace("{} -> {} : {}", sender, recipient, value); + AffinityPropagation.this.sendMessage(recipient, new APMessage(sender, value)); + lastMessages.put(recipient, new DoubleWritable(value)); + nMessagesSent++; + } + + public boolean isConverged() { + return nMessagesSent == 0; + } + } + +} diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/ExemplarAggregator.java b/src/main/java/ml/grafos/okapi/clustering/ap/ExemplarAggregator.java new file mode 100644 index 00000000..836d5f9c --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/ExemplarAggregator.java @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import ml.grafos.okapi.common.data.LongArrayListWritable; +import org.apache.giraph.aggregators.BasicAggregator; + +/** + * Aggregates points that have chosen themselves as an exemplar. + * + * @author Josep Rubió Piqué + */ +public class ExemplarAggregator extends BasicAggregator { + + @Override + public void aggregate(LongArrayListWritable value) { + getAggregatedValue().addAll(value); + } + + @Override + public LongArrayListWritable createInitialValue() { + return new LongArrayListWritable(); + } +} diff --git a/src/main/java/ml/grafos/okapi/clustering/ap/MasterComputation.java b/src/main/java/ml/grafos/okapi/clustering/ap/MasterComputation.java new file mode 100644 index 00000000..6240e9ec --- /dev/null +++ b/src/main/java/ml/grafos/okapi/clustering/ap/MasterComputation.java @@ -0,0 +1,43 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.master.DefaultMasterCompute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** +* Affinity Propagation's MasterCompute. +* +* @author Josep Rubió Piqué +*/ +public class MasterComputation extends DefaultMasterCompute { + private static Logger logger = LoggerFactory.getLogger(AffinityPropagation.class); + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + super.initialize(); + registerPersistentAggregator("exemplars", ExemplarAggregator.class); + registerPersistentAggregator("converged", LongSumAggregator.class); + } + + @Override + public void compute() { + logger.debug("Master computation at iteration {}", getSuperstep()); + super.compute(); + } +} \ No newline at end of file diff --git a/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java b/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java new file mode 100644 index 00000000..b4e8988f --- /dev/null +++ b/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java @@ -0,0 +1,96 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import com.google.common.io.Resources; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.utils.InternalVertexRunner; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +public class AffinityPropagationStockTest { + private GiraphConfiguration conf; + + @Before + public void initialize() { + conf = new GiraphConfiguration(); + conf.setComputationClass(AffinityPropagation.class); + conf.setMasterComputeClass(MasterComputation.class); + conf.setInt(AffinityPropagation.MAX_ITERATIONS, 100); + conf.setFloat(AffinityPropagation.DAMPING, 0.5f); + conf.setVertexOutputFormatClass(APOutputFormat.class); + conf.setBoolean("giraph.useSuperstepCounters", false); + } + + @Test + public void testToyProblem() throws IOException { + String[] expected = { + "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", + "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", + "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", + "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", + }; + Arrays.sort(expected, Ordering.natural()); + + List lines = Resources.readLines( + Resources.getResource(getClass(), "stockdailypropcovariance.txt"), + StandardCharsets.UTF_8); + String[] graph = lines.toArray(new String[0]); + + conf.setVertexInputFormatClass(APVertexInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, graph, null)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + int[] cluster = new int[results.size()]; + for(String res: results){ + String[] parts = res.split("\t"); + cluster[Integer.parseInt(parts[0]) - 1] = Integer.parseInt(parts[1]); + } + + File file = new File("clustersdaily.txt"); + + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + for (int i = 0; i < cluster.length; i++) { + bw.write(String.valueOf(cluster[i])); + bw.newLine(); + } + + bw.close(); + + assertArrayEquals(expected, results.toArray()); + } + +} \ No newline at end of file diff --git a/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationTest.java b/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationTest.java new file mode 100644 index 00000000..53243970 --- /dev/null +++ b/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationTest.java @@ -0,0 +1,156 @@ +/** + * Copyright 2014 Grafos.ml + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ml.grafos.okapi.clustering.ap; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import com.google.common.io.Resources; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.utils.InternalVertexRunner; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +public class AffinityPropagationTest { + private GiraphConfiguration conf; + + @Before + public void initialize() { + conf = new GiraphConfiguration(); + conf.setComputationClass(AffinityPropagation.class); + conf.setMasterComputeClass(MasterComputation.class); + conf.setInt(AffinityPropagation.MAX_ITERATIONS, 100); + conf.setFloat(AffinityPropagation.DAMPING, 0.5f); + conf.setVertexOutputFormatClass(APOutputFormat.class); + conf.setBoolean("giraph.useSuperstepCounters", false); + } + + @Test + public void testVertexInput() { + String[] graph = { + "1 1 1 5", + "2 1 1 3", + "3 5 3 1", + }; + String[] expected = { + "1\t3", "2\t3", "3\t3" + }; + + conf.setVertexInputFormatClass(APVertexInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, graph, null)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + /*@Test + public void testEdgeInput() { + String[] graph = { + "1 1 1", + "1 3 5", + "2 1 1", + "2 2 1", + "2 3 3", + "3 1 5", + "3 2 3", + "3 3 1", + }; + String[] expected = { + "1\t3", "2\t3", "3\t3" + }; + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + }*/ + + /*@Test + public void testSparse() { + String[] graph = { + "1 1 1", + "1 2 1", + "1 3 5", + "2 1 1", + "2 2 1", + "3 1 4", + "3 3 1", + }; + String[] expected = { + "1\t3", "2\t2", "3\t3" + }; + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + }*/ + + /*@Test + public void testToyProblem() throws IOException { + String[] expected = { + "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", + "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", + "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", + "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", + }; + Arrays.sort(expected, Ordering.natural()); + + List lines = Resources.readLines( + Resources.getResource(getClass(), "toyProblem.txt"), + StandardCharsets.UTF_8); + String[] graph = lines.toArray(new String[0]); + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + }*/ + +} \ No newline at end of file diff --git a/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java b/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java new file mode 100644 index 00000000..d49e5793 --- /dev/null +++ b/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java @@ -0,0 +1,228 @@ +package ml.grafos.okapi.clustering.ap; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import com.google.common.io.Resources; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.utils.InternalVertexRunner; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Created by joseprubio on 10/4/15. + */ +public class StockTest { + private GiraphConfiguration conf; + + @Before + public void initialize() { + conf = new GiraphConfiguration(); + conf.setComputationClass(AffinityPropagation.class); + conf.setMasterComputeClass(MasterComputation.class); + conf.setInt(AffinityPropagation.MAX_ITERATIONS, 100); + conf.setFloat(AffinityPropagation.DAMPING, 0.0f); + conf.setVertexOutputFormatClass(APOutputFormat.class); + conf.setBoolean("giraph.useSuperstepCounters", false); + } + + @Test + public void testVertexInput() throws IOException{ + String[] graph = { + "1 1 1 5", + "2 1 1 3", + "3 5 3 1", + }; + String[] expected = { + "1\t3", "2\t3", "3\t3" + }; + List lines = Resources.readLines( + Resources.getResource(getClass(), "workfile"), + StandardCharsets.UTF_8); + String[] graph2 = lines.toArray(new String[0]); + + conf.setVertexInputFormatClass(APVertexInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, graph2, null)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + /*@Test + public void testToyProblem() throws IOException { + String[] expected = { + "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", + "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", + "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", + "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", + }; + Arrays.sort(expected, Ordering.natural()); + + List lines = Resources.readLines( + Resources.getResource(getClass(), "toyProblem.txt"), + StandardCharsets.UTF_8); + String[] graph = lines.toArray(new String[0]); + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + }*/ + + /*@Test + public void testStocks() throws IOException { + String[] expected = { + "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", + "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", + "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", + "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", + }; + Arrays.sort(expected, Ordering.natural()); + + List lines = Resources.readLines( + Resources.getResource(getClass(), "toyProblem.txt"), + StandardCharsets.UTF_8); + String[] graph = lines.toArray(new String[0]); + + conf.setVertexInputFormatClass(APVertexInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + /*@Test + public void testVertexInput() { + String[] graph = { + "1 1 1 5", + "2 1 1 3", + "3 5 3 1", + }; + String[] expected = { + "1\t3", "2\t3", "3\t3" + }; + + conf.setVertexInputFormatClass(APVertexInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, graph, null)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + @Test + public void testEdgeInput() { + String[] graph = { + "1 1 1", + "1 2 1", + "1 3 5", + "2 1 1", + "2 2 1", + "2 3 3", + "3 1 4", + "3 2 3", + "3 3 1", + }; + String[] expected = { + "1\t3", "2\t3", "3\t3" + }; + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + @Test + public void testSparse() { + String[] graph = { + "1 1 1", + "1 2 1", + "1 3 5", + "2 1 1", + "2 2 1", + "3 1 4", + "3 3 1", + }; + String[] expected = { + "1\t3", "2\t2", "3\t3" + }; + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + } + + @Test + public void testToyProblem() throws IOException { + String[] expected = { + "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", + "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", + "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", + "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", + }; + Arrays.sort(expected, Ordering.natural()); + + List lines = Resources.readLines( + Resources.getResource(getClass(), "toyProblem.txt"), + StandardCharsets.UTF_8); + String[] graph = lines.toArray(new String[0]); + + conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); + + ImmutableList results; + try { + results = Ordering.natural().immutableSortedCopy( + InternalVertexRunner.run(conf, null, graph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertArrayEquals(expected, results.toArray()); + }*/ +} From 7f58ac58152ce297fab8a9bbe2ffa2fac4471435 Mon Sep 17 00:00:00 2001 From: Josep Rubio Date: Thu, 28 Jan 2016 18:18:38 -0600 Subject: [PATCH 2/3] First --- okapi.iml | 93 ------------------------------------------------------- 1 file changed, 93 deletions(-) delete mode 100644 okapi.iml diff --git a/okapi.iml b/okapi.iml deleted file mode 100644 index e843457c..00000000 --- a/okapi.iml +++ /dev/null @@ -1,93 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file From 2d72ba37229d0a3d2728b10a6e824f1349b7d66b Mon Sep 17 00:00:00 2001 From: Josep Rubio Date: Thu, 28 Jan 2016 18:21:52 -0600 Subject: [PATCH 3/3] No tests --- .../ap/AffinityPropagationStockTest.java | 96 -------- .../grafos/okapi/clustering/ap/StockTest.java | 228 ------------------ 2 files changed, 324 deletions(-) delete mode 100644 src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java delete mode 100644 src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java diff --git a/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java b/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java deleted file mode 100644 index b4e8988f..00000000 --- a/src/test/java/ml/grafos/okapi/clustering/ap/AffinityPropagationStockTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright 2014 Grafos.ml - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ml.grafos.okapi.clustering.ap; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; -import com.google.common.io.Resources; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.utils.InternalVertexRunner; -import org.junit.Before; -import org.junit.Test; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertArrayEquals; - -public class AffinityPropagationStockTest { - private GiraphConfiguration conf; - - @Before - public void initialize() { - conf = new GiraphConfiguration(); - conf.setComputationClass(AffinityPropagation.class); - conf.setMasterComputeClass(MasterComputation.class); - conf.setInt(AffinityPropagation.MAX_ITERATIONS, 100); - conf.setFloat(AffinityPropagation.DAMPING, 0.5f); - conf.setVertexOutputFormatClass(APOutputFormat.class); - conf.setBoolean("giraph.useSuperstepCounters", false); - } - - @Test - public void testToyProblem() throws IOException { - String[] expected = { - "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", - "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", - "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", - "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", - }; - Arrays.sort(expected, Ordering.natural()); - - List lines = Resources.readLines( - Resources.getResource(getClass(), "stockdailypropcovariance.txt"), - StandardCharsets.UTF_8); - String[] graph = lines.toArray(new String[0]); - - conf.setVertexInputFormatClass(APVertexInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, graph, null)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - int[] cluster = new int[results.size()]; - for(String res: results){ - String[] parts = res.split("\t"); - cluster[Integer.parseInt(parts[0]) - 1] = Integer.parseInt(parts[1]); - } - - File file = new File("clustersdaily.txt"); - - FileWriter fw = new FileWriter(file.getAbsoluteFile()); - BufferedWriter bw = new BufferedWriter(fw); - for (int i = 0; i < cluster.length; i++) { - bw.write(String.valueOf(cluster[i])); - bw.newLine(); - } - - bw.close(); - - assertArrayEquals(expected, results.toArray()); - } - -} \ No newline at end of file diff --git a/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java b/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java deleted file mode 100644 index d49e5793..00000000 --- a/src/test/java/ml/grafos/okapi/clustering/ap/StockTest.java +++ /dev/null @@ -1,228 +0,0 @@ -package ml.grafos.okapi.clustering.ap; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; -import com.google.common.io.Resources; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.utils.InternalVertexRunner; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertArrayEquals; - -/** - * Created by joseprubio on 10/4/15. - */ -public class StockTest { - private GiraphConfiguration conf; - - @Before - public void initialize() { - conf = new GiraphConfiguration(); - conf.setComputationClass(AffinityPropagation.class); - conf.setMasterComputeClass(MasterComputation.class); - conf.setInt(AffinityPropagation.MAX_ITERATIONS, 100); - conf.setFloat(AffinityPropagation.DAMPING, 0.0f); - conf.setVertexOutputFormatClass(APOutputFormat.class); - conf.setBoolean("giraph.useSuperstepCounters", false); - } - - @Test - public void testVertexInput() throws IOException{ - String[] graph = { - "1 1 1 5", - "2 1 1 3", - "3 5 3 1", - }; - String[] expected = { - "1\t3", "2\t3", "3\t3" - }; - List lines = Resources.readLines( - Resources.getResource(getClass(), "workfile"), - StandardCharsets.UTF_8); - String[] graph2 = lines.toArray(new String[0]); - - conf.setVertexInputFormatClass(APVertexInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, graph2, null)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - } - - /*@Test - public void testToyProblem() throws IOException { - String[] expected = { - "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", - "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", - "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", - "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", - }; - Arrays.sort(expected, Ordering.natural()); - - List lines = Resources.readLines( - Resources.getResource(getClass(), "toyProblem.txt"), - StandardCharsets.UTF_8); - String[] graph = lines.toArray(new String[0]); - - conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, null, graph)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - }*/ - - /*@Test - public void testStocks() throws IOException { - String[] expected = { - "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", - "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", - "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", - "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", - }; - Arrays.sort(expected, Ordering.natural()); - - List lines = Resources.readLines( - Resources.getResource(getClass(), "toyProblem.txt"), - StandardCharsets.UTF_8); - String[] graph = lines.toArray(new String[0]); - - conf.setVertexInputFormatClass(APVertexInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, null, graph)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - } - - /*@Test - public void testVertexInput() { - String[] graph = { - "1 1 1 5", - "2 1 1 3", - "3 5 3 1", - }; - String[] expected = { - "1\t3", "2\t3", "3\t3" - }; - - conf.setVertexInputFormatClass(APVertexInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, graph, null)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - } - - @Test - public void testEdgeInput() { - String[] graph = { - "1 1 1", - "1 2 1", - "1 3 5", - "2 1 1", - "2 2 1", - "2 3 3", - "3 1 4", - "3 2 3", - "3 3 1", - }; - String[] expected = { - "1\t3", "2\t3", "3\t3" - }; - - conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, null, graph)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - } - - @Test - public void testSparse() { - String[] graph = { - "1 1 1", - "1 2 1", - "1 3 5", - "2 1 1", - "2 2 1", - "3 1 4", - "3 3 1", - }; - String[] expected = { - "1\t3", "2\t2", "3\t3" - }; - - conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, null, graph)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - } - - @Test - public void testToyProblem() throws IOException { - String[] expected = { - "1\t3", "2\t3", "3\t3", "4\t3", "5\t3", "6\t3", "7\t7", - "8\t7", "9\t7", "10\t7", "11\t3", "12\t7", "13\t3", "14\t7", - "15\t7", "16\t20", "17\t20", "18\t20", "19\t20", "20\t20", - "21\t20", "22\t3", "23\t20", "24\t20", "25\t7", - }; - Arrays.sort(expected, Ordering.natural()); - - List lines = Resources.readLines( - Resources.getResource(getClass(), "toyProblem.txt"), - StandardCharsets.UTF_8); - String[] graph = lines.toArray(new String[0]); - - conf.setEdgeInputFormatClass(APEdgeInputFormatter.class); - - ImmutableList results; - try { - results = Ordering.natural().immutableSortedCopy( - InternalVertexRunner.run(conf, null, graph)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - assertArrayEquals(expected, results.toArray()); - }*/ -}