Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2014 Grafos.ml
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ml.grafos.okapi.clustering.ap;

import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.regex.Pattern;

/**
* Edge input formatter for Affinity Propagation problems.
* <p/>
* The input format consists of an entry for each pair of points. The first
* element of the entry denotes the id of the first point. Similarly, the
* second element denotes the id of the second point. Finally, the third
* element contains a double value encoding the similarity between the first
* and second points.
* <p/>
* Example:<br/>
* 1 1 1
* 1 2 1
* 1 3 5
* 2 1 1
* 2 2 1
* 2 3 3
* 3 1 5
* 3 2 3
* 3 3 1
* <p/>
* Encodes a problem in which data point "1" has similarity 1 with itself,
* 1 with point "2" and 5 with point "3". In a similar manner, points "2",
* and "3" have similarities of [1, 1, 3] and [5, 3, 1] respectively with
* points "1", "2", and "3".
*
* @author Josep Rubió Piqué <josep@datag.es>
*/
public class APEdgeInputFormatter extends TextEdgeInputFormat<APVertexID, DoubleWritable> {

private static final Pattern SEPARATOR = Pattern.compile("[\001\t ]");

@Override
public EdgeReader<APVertexID, DoubleWritable> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new APEdgeInputReader();
}

public class APEdgeInputReader extends TextEdgeReaderFromEachLineProcessed<String []> {

@Override
protected String[] preprocessLine(Text line) throws IOException {
return SEPARATOR.split(line.toString());
}

@Override
protected APVertexID getTargetVertexId(String[] line) throws IOException {
return new APVertexID(APVertexType.E, Long.valueOf(line[1]));
}

@Override
protected APVertexID getSourceVertexId(String[] line) throws IOException {
return new APVertexID(APVertexType.I, Long.valueOf(line[0]));
}

@Override
protected DoubleWritable getValue(String[] line) throws IOException {
return new DoubleWritable(Double.valueOf(line[2])
+ getConf().getFloat(AffinityPropagation.NOISE, AffinityPropagation.NOISE_DEFAULT));
}
}
}
59 changes: 59 additions & 0 deletions src/main/java/ml/grafos/okapi/clustering/ap/APMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2014 Grafos.ml
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ml.grafos.okapi.clustering.ap;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* This class defines the messages exchanged between Affinity propagation vertices.
*
* @author Josep Rubió Piqué <josep@datag.es>
*/
public class APMessage implements Writable {

public APVertexID from;
public double value;

public APMessage() {
from = new APVertexID();
}

public APMessage(APVertexID from, double value) {
this.from = from;
this.value = value;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
from.write(dataOutput);
dataOutput.writeDouble(value);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
from.readFields(dataInput);
value = dataInput.readDouble();
}

@Override
public String toString() {
return "APMessage{from=" + from + ", value=" + value + '}';
}
}
89 changes: 89 additions & 0 deletions src/main/java/ml/grafos/okapi/clustering/ap/APOutputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright 2014 Grafos.ml
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ml.grafos.okapi.clustering.ap;

import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
* Output Formatter for Affinity Propagation problems.
* <p/>
* The output format consists of an entry for each of the data points to cluster.
* The first element of the entry is a integer value encoding the data point
* index (id), whereas the second value encodes the exemplar id chosen for
* that point.
* <p/>
* Example:<br/>
* 1 3
* 2 3
* 3 3
* <p/>
* Encodes a solution in which data points "1", "2", and "3" choose point "3"
* as an exemplar.
*
* @author Josep Rubió Piqué <josep@datag.es>
*/
@SuppressWarnings("rawtypes")
public class APOutputFormat
extends IdWithValueTextOutputFormat<APVertexID,APVertexValue, DoubleWritable> {

/**
* Specify the output delimiter
*/
public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
/**
* Default output delimiter
*/
public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";

@Override
public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
return new IdWithValueVertexWriter();
}

protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
/**
* Saved delimiter
*/
private String delimiter;

@Override
public void initialize(TaskAttemptContext context) throws IOException,
InterruptedException {
super.initialize(context);
delimiter = getConf().get(
LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
}

@Override
protected Text convertVertexToLine(Vertex<APVertexID,
APVertexValue, DoubleWritable> vertex)
throws IOException {

if (vertex.getId().type != APVertexType.I) {
return null;
}

return new Text(String.valueOf(vertex.getId().index)
+ delimiter + Long.toString(vertex.getValue().exemplar.get()));
}
}
}
94 changes: 94 additions & 0 deletions src/main/java/ml/grafos/okapi/clustering/ap/APVertexID.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright 2014 Grafos.ml
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ml.grafos.okapi.clustering.ap;

import com.google.common.collect.ComparisonChain;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Id for Affinity propagation vertices.
*
* @author Josep Rubió Piqué <josep@datag.es>
*/
public class APVertexID implements WritableComparable<APVertexID> {

/**
* Identifies whether the vertex represents a or or a column factor.
*/
public APVertexType type = APVertexType.I;
/**
* Index of the point to be clustered.
*/
public long index = 0;

public APVertexID() {
}

public APVertexID(APVertexID orig) {
this(orig.type, orig.index);
}

public APVertexID(APVertexType type, long index) {
this.type = type;
this.index = index;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(type.ordinal());
dataOutput.writeLong(index);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
final int typeIndex = dataInput.readInt();
type = APVertexType.values()[typeIndex];
this.index = dataInput.readLong();
}

@Override
public int compareTo(APVertexID that) {
return ComparisonChain.start()
.compare(this.type, that.type)
.compare(this.index, that.index)
.result();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

APVertexID that = (APVertexID) o;
return index == that.index && type == that.type;
}

@Override
public int hashCode() {
int result = (int) (index ^ (index >>> 32));
result = 31 * result + type.ordinal();
return result;
}

@Override
public String toString() {
return "(" + type + ", " + index + ")";
}
}
Loading