diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java new file mode 100644 index 000000000..aea061100 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/DiffusionMigrationMasterCompute.java @@ -0,0 +1,201 @@ +package org.apache.giraph.examples; + + +import org.apache.giraph.aggregators.BooleanOrAggregator; +import org.apache.giraph.aggregators.LongMaxAggregator; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; + + +public class DiffusionMigrationMasterCompute extends MigrationFullMasterCompute { + + protected Logger LOG = Logger.getLogger(DiffusionMigrationMasterCompute.class); + + public static final String convincedVerticesAggregator = "CONV_AGG_DIFF"; + public static final String usingVerticesAggregator = "AGG_DIFF"; + public static final String deadVerticesAggregator = "AGG_DIFF_DEAD"; + public static final String latestActivationsAggregator = "AGG_ACTIVATED_LAST"; + public static final String activatedVerticesCounterGroup = "Diffusion Counters"; + public static final String convincedVerticesCounter = "Convinced_Vertices "; + public static final String usingVerticesCounter = "Using_Vertices "; + public static final String deadVerticesCounter = "Dead_Vertices "; + public static final String diffusionDeltaOption = "diffusion.delta"; + public static final double diffusionDeltaOptionDefault = 0.005; + public static final String diffusionListenOption = "diffusion.listenWhileUnactive"; + public static final String hesitantVerticesAggregator="hesitantVerticesAggregator "; + + + public static final String byLabelOption="by_label"; + + //for KCORE (or general label) based algorithm + public static final String invitedVerticesAggregator="Invited_Vertices "; + public static final String almostConvincedVerticesAggregator="AlmostConvinced_Vertices "; + public static final String currentLabel="Label_active "; //label da analizzare nello specifico superstep + public static final String nextLabel="Next_label "; //ogni volta riceve tutte le label ancora da eseguire + public static final String timeToSwitch="is_time_to_switch"; + public double KSwitchTreshold; + + //for MIN_NUMBER based algorithm + public boolean byLabel; + public int minNumber; + public static final String potentialVerticesAggregator="Potential_invited_vertices"; + public static final String oldInvitedVerticesAggregator="Old_invited_vertices"; + public static final String oldConvincedVerticesAggregator="Old_convinced_vertices"; + public static final String oldDeadVerticesAggregator="Old_dead_vertices"; + public static final String oldUsingVerticesAggregator="Old_using_vertices"; + public static final String oldHesitantVerticesAggregator="Old_hesitant_vertices"; + public static final String oldAlmostConvincedVerticesAggregator="Old_almostConvinced_vertices"; + public static final String justChangedTimeToSwitch="Just_changed_timeToSwitch_value"; + + + @Override + public void compute() { + //super.compute(); + long convincedVertices = ((LongWritable)getAggregatedValue(convincedVerticesAggregator)).get(); + long usingVertices = ((LongWritable)getAggregatedValue(usingVerticesAggregator)).get(); + long deadVertices = ((LongWritable)getAggregatedValue(deadVerticesAggregator)).get(); + long invitedVertices=((LongWritable)getAggregatedValue(invitedVerticesAggregator)).get(); + long almostConvincedVertices=((LongWritable)getAggregatedValue(almostConvincedVerticesAggregator)).get(); + long activeLabel=(int)((LongWritable)getAggregatedValue(currentLabel)).get(); + long hesitantVerticesAggregatorVal=((LongWritable)getAggregatedValue(hesitantVerticesAggregator)).get(); + + //This avoid having counters' value "0" when it's timeToSwitch and so the computation based on MIN_NUMBER is "paused" + if(!byLabel && ((BooleanWritable)getAggregatedValue(timeToSwitch)).get()) { + almostConvincedVertices=((LongWritable)getAggregatedValue(oldAlmostConvincedVerticesAggregator)).get(); + invitedVertices=((LongWritable)getAggregatedValue(oldInvitedVerticesAggregator)).get(); + usingVertices=((LongWritable)getAggregatedValue(oldUsingVerticesAggregator)).get(); + deadVertices=((LongWritable)getAggregatedValue(oldDeadVerticesAggregator)).get(); + convincedVertices=((LongWritable)getAggregatedValue(oldConvincedVerticesAggregator)).get(); + hesitantVerticesAggregatorVal=((LongWritable)getAggregatedValue(oldHesitantVerticesAggregator)).get(); + } + + getContext().getCounter(activatedVerticesCounterGroup,hesitantVerticesAggregator+superstep()).setValue(hesitantVerticesAggregatorVal); + getContext().getCounter(activatedVerticesCounterGroup, usingVerticesCounter + superstep()).setValue(usingVertices); + getContext().getCounter(activatedVerticesCounterGroup, convincedVerticesCounter + superstep()).setValue(convincedVertices); + getContext().getCounter(activatedVerticesCounterGroup, deadVerticesCounter + superstep()).setValue(deadVertices); + getContext().getCounter(activatedVerticesCounterGroup,invitedVerticesAggregator + superstep()).setValue(invitedVertices); + getContext().getCounter(activatedVerticesCounterGroup,almostConvincedVerticesAggregator + superstep()).setValue(almostConvincedVertices); + getContext().getCounter(activatedVerticesCounterGroup,currentLabel + superstep()).setValue(activeLabel); + + + + //test purpose + if(superstep()==0) { + System.out.println("InitProb,"+getConf().getStrings("InitialProbability","0.02")[0]); + System.out.println("Delta,"+getConf().getStrings("Delta","0.005")[0]); + }else { + System.out.println("InvitedVertices,"+(getSuperstep()-1)+","+invitedVertices); + System.out.println("ConvincedVertices,"+(getSuperstep()-1)+","+convincedVertices); + System.out.println("DeadVertices,"+(getSuperstep()-1)+","+deadVertices); + System.out.println("AlmostConvincedVertices,"+(getSuperstep()-1)+","+almostConvincedVertices); + System.out.println("UsingVertices,"+(getSuperstep()-1)+","+usingVertices); + System.out.println("LabelReached,"+(getSuperstep()-1)+","+activeLabel); + System.out.println("HesitantVertices,"+(getSuperstep()-1)+","+hesitantVerticesAggregatorVal); + } + + + if(getSuperstep() > 0 && getTotalNumVertices()==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal )) + haltComputation(); + + if(byLabel) {//Kcore or similar + if(getSuperstep()>0) { + if ( ((BooleanWritable)getAggregatedValue(timeToSwitch)).get() ) + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + if ( superstep()==1) { + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + }else if (activeLabel!=1){//if we haven't reached the lowest coreness + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .invitedVerticesAggregator)).get(); + //if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal) ) { + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + } + } + } + }else {//degree, pagerank or other similar where the label does not represent a group of vertices + if ( superstep()==0) { + setAggregatedValue(currentLabel, new LongWritable(Long.MAX_VALUE)); + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(oldInvitedVerticesAggregator, new LongWritable(0)); + } + if(superstep()>0) { + + if ( ! ((BooleanWritable)getAggregatedValue(timeToSwitch)).get() ) { + if(activeLabel>0) { + long almostConvicedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .almostConvincedVerticesAggregator)).get(); + long invitedVal=((LongWritable) getAggregatedValue(DiffusionMigrationMasterCompute .invitedVerticesAggregator)).get(); + //if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if(((double)almostConvicedVal)/invitedVal>KSwitchTreshold || invitedVertices==(deadVertices+convincedVertices+hesitantVerticesAggregatorVal)) { + setAggregatedValue(timeToSwitch, new BooleanWritable(true)); + setAggregatedValue(oldInvitedVerticesAggregator, (LongWritable)getAggregatedValue(invitedVerticesAggregator)); + setAggregatedValue(oldConvincedVerticesAggregator, (LongWritable)getAggregatedValue(convincedVerticesAggregator)); + setAggregatedValue(oldAlmostConvincedVerticesAggregator, (LongWritable)getAggregatedValue(almostConvincedVerticesAggregator)); + setAggregatedValue(oldDeadVerticesAggregator, (LongWritable)getAggregatedValue(deadVerticesAggregator)); + setAggregatedValue(oldHesitantVerticesAggregator, (LongWritable)getAggregatedValue(hesitantVerticesAggregator)); + setAggregatedValue(oldUsingVerticesAggregator, (LongWritable)getAggregatedValue(usingVerticesAggregator)); + } + } + }else { //it's time to switch: let's scan some label until we find at least vertices more than now + long old = ((LongWritable)getAggregatedValue(oldInvitedVerticesAggregator)).get(); + long actual = ((LongWritable)getAggregatedValue(potentialVerticesAggregator)).get(); + if (actual-old>minNumber) {//reached a label which increment the invited vertices by MIN_NUMBER at least + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + setAggregatedValue(justChangedTimeToSwitch, new BooleanWritable(true)); + }else if( ((LongWritable)getAggregatedValue(nextLabel)).get()<0 && superstep()>10 ){//reached the lowest label without finding at least MIN_NUMBER vertices + setAggregatedValue(timeToSwitch, new BooleanWritable(false)); + setAggregatedValue(justChangedTimeToSwitch, new BooleanWritable(true)); + setAggregatedValue(currentLabel, new LongWritable(0)); + }else {//continue to scan + setAggregatedValue(currentLabel, (LongWritable)getAggregatedValue(nextLabel)); + setAggregatedValue(nextLabel, new LongWritable(-1)); + } + } + } + } + + + } + + @Override + public void initialize() throws InstantiationException, IllegalAccessException { + super.initialize(); + KSwitchTreshold = Double.parseDouble(getConf().getStrings("KSwitchThreshold", "0.7")[0]); + registerAggregator(convincedVerticesAggregator, LongSumAggregator.class); + registerAggregator(usingVerticesAggregator, LongSumAggregator.class); + registerAggregator(deadVerticesAggregator, LongSumAggregator.class); + registerAggregator(invitedVerticesAggregator, LongSumAggregator.class); + registerAggregator(almostConvincedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(nextLabel, LongMaxAggregator.class); + registerPersistentAggregator(currentLabel, LongMaxAggregator.class); + registerPersistentAggregator(timeToSwitch, BooleanOrAggregator.class); + registerAggregator(hesitantVerticesAggregator, LongSumAggregator.class); + + byLabel=Boolean.parseBoolean(getConf().getStrings("ByLabel", "true")[0]); + registerPersistentAggregator(byLabelOption, BooleanOrAggregator.class); + setAggregatedValue(byLabelOption, new BooleanWritable(byLabel)); + if(!byLabel) { + minNumber = Integer.parseInt(getConf().getStrings("minNumber", "200")[0]); + registerAggregator(potentialVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldInvitedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldConvincedVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldDeadVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldUsingVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldHesitantVerticesAggregator, LongSumAggregator.class); + registerPersistentAggregator(oldAlmostConvincedVerticesAggregator, LongSumAggregator.class); + registerAggregator(justChangedTimeToSwitch, BooleanOrAggregator.class); + + } + } + + protected long superstep() { + return getSuperstep(); + } + +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java new file mode 100644 index 000000000..6a4c1baa2 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/DiffusionVertexValue.java @@ -0,0 +1,134 @@ +package org.apache.giraph.examples.feature_diffusion.datastructures; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; + +public class DiffusionVertexValue implements Writable { + + protected int vertexThreshold; + protected int label; + protected double currentActivationProbability; + protected double delta; + protected double almostConvincedTreshold; + + protected int activeNeighbors = 0; + + public DiffusionVertexValue() { + this.vertexThreshold = 1; + this.label = 1; + } + + public DiffusionVertexValue(int label) { + this.vertexThreshold = 1; + this.label = label; + } + + public DiffusionVertexValue(int vertexThreshold, int label) { + this.vertexThreshold = vertexThreshold; + this.label = label; + } + + public void readFields(DataInput in) throws IOException { + vertexThreshold = in.readInt(); + label = in.readInt(); + currentActivationProbability = in.readDouble(); + activeNeighbors = in.readInt(); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(vertexThreshold); + out.writeInt(label); + out.writeDouble(currentActivationProbability); + out.writeInt(activeNeighbors); + } + + public double getCurrentActivationProbability() { + return currentActivationProbability; + } + + public void modifyCurrentActivationProbability(int sign) { + BigDecimal tmpcurrentActivationProbability = + new BigDecimal(currentActivationProbability) + .add(new BigDecimal(sign * delta)) + .setScale(5, RoundingMode.HALF_UP); + if (tmpcurrentActivationProbability.doubleValue() > 1) currentActivationProbability = 1; + else currentActivationProbability = tmpcurrentActivationProbability.doubleValue(); + if (tmpcurrentActivationProbability.doubleValue() <= 0) currentActivationProbability = 0; + } + + public boolean isVertexInvited(long currentLabel) { + return this.label >= currentLabel; + } + + public boolean isVertexDead() { + return new BigDecimal(currentActivationProbability) + .setScale(2, RoundingMode.HALF_DOWN) + .floatValue() + == 0; + } + + public boolean isVertexConvinced() { + return new BigDecimal(currentActivationProbability) + .setScale(2, RoundingMode.HALF_DOWN) + .floatValue() + == 1; + } + + public void setVertexThreshold(int threshold) { + this.vertexThreshold = threshold; + } + + public int getVertexThreshold() { + return vertexThreshold; + } + + public long getLabel() { + return this.label; + } + + public boolean rollActivationDice() { + return Math.random() <= currentActivationProbability; + } + + public void setlabel(int coreness) { + this.label = coreness; + } + + public boolean isAlmostConvinced() { + return currentActivationProbability > almostConvincedTreshold; + } + + // used at ss=0 in case of differences from default 0.2 + public void setInitialActivationProbability(double initialActivationProbability) { + this.currentActivationProbability = initialActivationProbability; + } + + public void setAlmostConvincedTreshold(double almostConvincedTreshold) { + this.almostConvincedTreshold = almostConvincedTreshold; + } + + public void setDelta(double delta) { + this.delta = delta; + } + + public int getActiveNeighbors() { + return activeNeighbors; + } + + public void setActiveNeighbors(int activeNeighbors) { + this.activeNeighbors = activeNeighbors; + } + + public void reset() { // Method to reset temporary data structures + activeNeighbors = 0; + } + + public String toString() { + return "" + label + "," + currentActivationProbability; + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java new file mode 100644 index 000000000..376a21175 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/datastructures/LabelingVertexValue.java @@ -0,0 +1,77 @@ +package org.apache.giraph.examples.feature_diffusion.datastructures; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; + +public class LabelingVertexValue implements Writable { + + protected long label; + protected int threshold; + protected boolean labelJustChanged = false; + protected double temporaryValue; + protected HashMap neighborsLabels = new HashMap(); + + public LabelingVertexValue() { + this.threshold = 1; + } + + public LabelingVertexValue(int threshold) { + this.threshold = threshold; + } + + public void readFields(DataInput in) throws IOException { + label = in.readLong(); + threshold = in.readInt(); + labelJustChanged = in.readBoolean(); + temporaryValue = in.readDouble(); + } + + public void write(DataOutput out) throws IOException { + out.writeLong(label); + out.writeInt(threshold); + out.writeBoolean(labelJustChanged); + out.writeDouble(temporaryValue); + } + + public long getLabel() { + return label; + } + + public HashMap getNeighborsLabel() { + return neighborsLabels; + } + + public boolean isChanged() { + return labelJustChanged; + } + + public void setLabel(long label) { + this.label = label; + this.labelJustChanged = true; + } + + public void setChanged(boolean newChanged) { + this.labelJustChanged = newChanged; + } + + public void updateNeighboorLabel(long id, long label) { + if (!neighborsLabels.containsKey(id)) neighborsLabels.put(id, label); + else if (neighborsLabels.get(id) > label) neighborsLabels.put(id, label); + } + + public double getTemp() { + return temporaryValue; + } + + public void setTemp(double temp) { + this.temporaryValue = temp; + } + + public String toString() { + return "" + label + "," + threshold; + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java new file mode 100644 index 000000000..b53575454 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/DiffusionConstants.java @@ -0,0 +1,40 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion; + +public class DiffusionConstants { + + // BROADCAST STRINGS + public static final String superstepBroadcast = "CURR_SST"; + public static final String convincedVerticesAggregator = "CONV_AGG_DIFF"; + public static final String usingVerticesAggregator = "AGG_DIFF"; + public static final String deadVerticesAggregator = "AGG_DIFF_DEAD"; + public static final String latestActivationsAggregator = "AGG_ACTIVATED_LAST"; + public static final String potentialVerticesAggregator = "Potential_invited_vertices"; + public static final String oldInvitedVerticesAggregator = "Old_invited_vertices"; + public static final String oldConvincedVerticesAggregator = "Old_convinced_vertices"; + public static final String oldDeadVerticesAggregator = "Old_dead_vertices"; + public static final String oldUsingVerticesAggregator = "Old_using_vertices"; + public static final String oldHesitantVerticesAggregator = "Old_hesitant_vertices"; + public static final String oldAlmostConvincedVerticesAggregator = "Old_almostConvinced_vertices"; + public static final String justChangedTimeToSwitch = "Just_changed_timeToSwitch_value"; + public static final String timeToSwitchBroadcast = "is_time_to_switch"; + + // GROUPS + + public static final String activatedVerticesCounterGroup = "Diffusion Counters"; + public static final String convincedVerticesCounter = "Convinced_Vertices "; + public static final String usingVerticesCounter = "Using_Vertices "; + public static final String deadVerticesCounter = "Dead_Vertices "; + public static final String hesitantVerticesAggregator = "hesitantVerticesAggregator "; + public static final String invitedVerticesAggregator = "Invited_Vertices "; + public static final String almostConvincedVerticesAggregator = "AlmostConvinced_Vertices "; + public static final String currentLabel = + "Label_active "; // label da analizzare nello specifico superstep + + // OPTIONS + public static final String diffusionDeltaOption = "diffusion.delta"; + public static final double diffusionDeltaOptionDefault = 0.005; + public static final String diffusionListenOption = "diffusion.listenWhileUnactive"; + public static final String byLabelOption = "by_label"; + + public double KSwitchTreshold; +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java new file mode 100644 index 000000000..8866a42d7 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/factory/DiffusionBlockFactory.java @@ -0,0 +1,98 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion.block_app.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; +import org.apache.giraph.examples.feature_diffusion.diffusion.block_app.piece.DiffusionComputationPiece; + +public class DiffusionBlockFactory extends AbstractBlockFactory + implements TypesHolder { + + ObjectTransfer firstMaxLabel; + ObjectTransfer stoppingCondition; + + double delta; + String thresholdType; + double initialActivationProbability; + double almostConvincedThreshold; + + public DiffusionBlockFactory() { + firstMaxLabel = new ObjectTransfer(); + stoppingCondition = new ObjectTransfer(); + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + delta = (double) conf.getFloat("Delta", (float) 0.005); + thresholdType = conf.getStrings("ThresholdType", "")[0]; + + initialActivationProbability = (double) conf.getFloat("InitialProbability", (float) 0.02); + almostConvincedThreshold = (double) conf.getFloat("AlmostConvincedTreshold", (float) 0.7); + + DiffusionComputationPiece dcp = new DiffusionComputationPiece(firstMaxLabel, stoppingCondition); + + return new SequenceBlock( + Pieces.reduce( + "Setup block", + new MaxReduce(LongTypeOps.INSTANCE), + (vertex) -> { + return new LongWritable(setup(vertex.getValue())); + }, + (value) -> { + firstMaxLabel.apply(value); + }), + new RepeatUntilBlock( + GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.get(conf), dcp, stoppingCondition)); + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return DiffusionVertexValue.class; + } + + /** + * Set the initial values for some vertex parameters + * + * @param value the vertex value on which operate + * @return the vertex label + */ + private long setup(DiffusionVertexValue value) { + value.setDelta(delta); + value.setInitialActivationProbability(initialActivationProbability); + value.setAlmostConvincedTreshold(almostConvincedThreshold); + if (thresholdType.compareTo("1") == 0) value.setVertexThreshold(1); + else if (thresholdType.compareTo("Prop") == 0) { + value.setVertexThreshold((int) value.getLabel() / 20); + } + return value.getLabel(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java new file mode 100644 index 000000000..3386227eb --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/diffusion/block_app/piece/DiffusionComputationPiece.java @@ -0,0 +1,387 @@ +package org.apache.giraph.examples.feature_diffusion.diffusion.block_app.piece; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.reducers.impl.OrReduce; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; +import org.apache.giraph.examples.feature_diffusion.diffusion.DiffusionConstants; + +public class DiffusionComputationPiece + extends Piece { + + // ADDED CONSTANTS + ReducerHandle superstepReducerHandle; + + // EXISTING CONSTANTS + + ReducerHandle convincedVerticesHandle; + ReducerHandle usingVerticesHandle; + ReducerHandle deadVerticesHandle; + ReducerHandle latestActivationsHandle; + ReducerHandle hesitantVerticesHandle; + + // for KCORE (or general label) based algorithm + ReducerHandle invitedVerticesHandle; + ReducerHandle almostConvincedVerticesHandle; + ReducerHandle + currentLabelHandle; // label da analizzare nello specifico superstep + ReducerHandle + nextLabelHandle; // ogni volta riceve tutte le label ancora da eseguire + ReducerHandle timeToSwitchHandle; + + // for MIN_NUMBER based algorithm + ReducerHandle potentialVerticesHandle; + ReducerHandle oldInvitedVerticesHandle; + ReducerHandle oldConvincedVerticesHandle; + ReducerHandle oldDeadVerticesHandle; + ReducerHandle oldUsingVerticesHandle; + ReducerHandle oldHesitantVerticesHandle; + ReducerHandle oldAlmostConvincedVerticesHandle; + ReducerHandle justChangedTimeToSwitchHandle; + + // GLOBAL VARIABLES + long superstep = 0; + + long currentLabel; + boolean byLabel; + boolean timeToSwitch; + long nextLabel; + + int minNumber; + double kSwitchThreshold; + + long convincedVertices; + long usingVertices; + long deadVertices; + long invitedVertices; + long almostConvincedVertices; + long hesitantVertices; + long oldInvitedVertices; + private long oldConvincedVertices; + private long oldDeadVertices; + private long oldHesitantVertices; + private long oldUsingVertices; + private long oldAlmostConvincedVertices; + + ObjectTransfer setupMaxLabel; + ObjectTransfer stoppingCondition; + + boolean oldMaxLabelRetrieved = false; + + private boolean justChangedTimeToSwitch; + + public DiffusionComputationPiece( + ObjectTransfer oldMaxLabel, ObjectTransfer stoppingCondition) { + this.setupMaxLabel = oldMaxLabel; + this.stoppingCondition = stoppingCondition; + } + + @Override + public void registerReducers(CreateReducersApi reduceApi, Object executionStage) { + super.registerReducers(reduceApi, executionStage); + + byLabel = reduceApi.getConf().getBoolean("ByLabel", true); + kSwitchThreshold = (double) reduceApi.getConf().getFloat("KSwitchThreshold", (float) 0.7); + + convincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + usingVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + deadVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + invitedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + almostConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + nextLabelHandle = + reduceApi.createLocalReducer(new MaxReduce(LongTypeOps.INSTANCE)); + currentLabelHandle = + reduceApi.createLocalReducer(new MaxReduce(LongTypeOps.INSTANCE)); + timeToSwitchHandle = reduceApi.createLocalReducer(new OrReduce()); + hesitantVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + + if (!byLabel) { + minNumber = reduceApi.getConf().getInt("minNumber", 200); + potentialVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldInvitedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldDeadVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldUsingVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldHesitantVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + oldAlmostConvincedVerticesHandle = + reduceApi.createLocalReducer(new SumReduce(LongTypeOps.INSTANCE)); + justChangedTimeToSwitchHandle = reduceApi.createLocalReducer(new OrReduce()); + } + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + super.masterCompute(master, executionStage); + + convincedVertices = convincedVerticesHandle.getReducedValue(master).get(); + usingVertices = usingVerticesHandle.getReducedValue(master).get(); + deadVertices = deadVerticesHandle.getReducedValue(master).get(); + invitedVertices = invitedVerticesHandle.getReducedValue(master).get(); + almostConvincedVertices = almostConvincedVerticesHandle.getReducedValue(master).get(); + hesitantVertices = hesitantVerticesHandle.getReducedValue(master).get(); + + if (oldMaxLabelRetrieved) { // First time the new label is transferred from another piece + if (timeToSwitch) nextLabel = nextLabelHandle.getReducedValue(master).get(); + } else { + nextLabel = setupMaxLabel.get().get(); + oldMaxLabelRetrieved = true; + } + + // This avoid having counters' value "0" when it's timeToSwitch and so the computation based on + // MIN_NUMBER is "paused" + if (!byLabel && timeToSwitch) { + almostConvincedVertices = oldAlmostConvincedVerticesHandle.getReducedValue(master).get(); + invitedVertices = oldInvitedVerticesHandle.getReducedValue(master).get(); + usingVertices = oldUsingVerticesHandle.getReducedValue(master).get(); + deadVertices = oldDeadVerticesHandle.getReducedValue(master).get(); + convincedVertices = oldConvincedVerticesHandle.getReducedValue(master).get(); + hesitantVertices = oldHesitantVerticesHandle.getReducedValue(master).get(); + } + + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.hesitantVerticesAggregator + superstep) + .setValue(hesitantVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.usingVerticesCounter + superstep) + .setValue(usingVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.convincedVerticesCounter + superstep) + .setValue(convincedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.deadVerticesCounter + superstep) + .setValue(deadVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.invitedVerticesAggregator + superstep) + .setValue(invitedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.almostConvincedVerticesAggregator + superstep) + .setValue(almostConvincedVertices); + master + .getCounter( + DiffusionConstants.activatedVerticesCounterGroup, + DiffusionConstants.currentLabel + superstep) + .setValue(currentLabel); + + if (byLabel) { // Kcore or similar + if (superstep >= 0) { + if (timeToSwitch) timeToSwitch = false; + if (superstep == 0) { + currentLabel = nextLabel; + nextLabel = -1; + timeToSwitch = true; + } else if (currentLabel != 1) { // if we haven't reached the lowest coreness + long almostConvicedVal = almostConvincedVerticesHandle.getReducedValue(master).get(); + long invitedVal = invitedVerticesHandle.getReducedValue(master).get(); + // if the threshold is reached or all the invited vertices are dead, convinced or hesitant + if (((double) almostConvicedVal) / invitedVal > kSwitchThreshold + || invitedVertices == (deadVertices + convincedVertices + hesitantVertices)) { + timeToSwitch = true; + currentLabel = nextLabel; + nextLabel = -1; + } + } + } + } else { // degree, pagerank or other similar where the label does not represent a group of + // vertices + justChangedTimeToSwitch = false; + if (superstep == 0) { + currentLabel = nextLabel; + timeToSwitch = true; + oldInvitedVertices = oldInvitedVerticesHandle.getReducedValue(master).get(); + } + if (superstep > 0) { + + if (!timeToSwitch) { + if (currentLabel > 0) { + // if the threshold is reached or all the invited vertices are dead, convinced or + // hesitant + if (((double) convincedVertices) / invitedVertices > kSwitchThreshold + || invitedVertices == (deadVertices + convincedVertices + hesitantVertices)) { + timeToSwitch = true; + oldInvitedVertices = invitedVertices; + oldConvincedVertices = convincedVertices; + oldAlmostConvincedVertices = almostConvincedVertices; + oldDeadVertices = deadVertices; + oldHesitantVertices = hesitantVertices; + oldUsingVertices = usingVertices; + } + } + } else { // it's time to switch: let's scan some label until we find at least + // vertices more than now + long old = oldInvitedVertices; + long actual = potentialVerticesHandle.getReducedValue(master).get(); + if (actual - old > minNumber) { + timeToSwitch = false; + justChangedTimeToSwitch = true; + } else if (nextLabelHandle.getReducedValue(master).get() < 0 + && superstep + > 10) { // reached the lowest label without finding at least MIN_NUMBER vertices + timeToSwitch = false; + justChangedTimeToSwitch = true; + currentLabel = 0; + } else { // continue to scan + currentLabel = nextLabel; + nextLabel = -1; + } + } + } + } + + superstep++; + + stoppingCondition.apply( + superstep > 0 + && master.getTotalNumVertices() + == (deadVertices + convincedVertices + hesitantVertices)); + } + + @Override + public VertexSender getVertexSender( + BlockWorkerSendApi workerApi, + Object executionStage) { + return (vertex) -> { + DiffusionVertexValue value = vertex.getValue(); + if (timeToSwitch && superstep > 0) + if (value.getLabel() < currentLabel) + nextLabelHandle.reduce(new LongWritable(value.getLabel())); + if (value.isVertexInvited(currentLabel) && superstep > 1) { + int activeNeighbors = value.getActiveNeighbors(); + if (byLabel) { + if (!value.isVertexDead() + && superstep != 1 + && activeNeighbors == value.getVertexThreshold()) + hesitantVerticesHandle.reduce(new LongWritable(activeNeighbors)); + aggregateVerticesBasedOnProbability(value); + } else { + if (!timeToSwitch) { + if (!value.isVertexDead() + && !justChangedTimeToSwitch + && activeNeighbors == value.getVertexThreshold()) + hesitantVerticesHandle.reduce(new LongWritable(1)); + aggregateVerticesBasedOnProbability(value); + } else potentialVerticesHandle.reduce(new LongWritable(1)); + } + } + value.reset(); + + if (value.isVertexInvited(currentLabel) && value.rollActivationDice() && superstep > 0) + if (byLabel || (!byLabel && !timeToSwitch)) { + usingVerticesHandle.reduce(new LongWritable(1)); + workerApi.sendMessageToAllEdges(vertex, new IntWritable(1)); + } + }; + } + + @Override + public VertexReceiver + getVertexReceiver(BlockWorkerReceiveApi workerApi, Object executionStage) { + return (vertex, messages) -> { + DiffusionVertexValue value = vertex.getValue(); + + if (value.isVertexInvited(currentLabel)) { + if (byLabel) { + if (!value.isVertexDead() + && superstep != 1) { // Update the using probability, if not dead + value.setActiveNeighbors(checkMsgsAndUpdateProbability(messages, value)); + } + } else { + if (!timeToSwitch) { + // Update the using probability if not dead and the computation has not just became + // active + // (because we don't have old messages sent so it would wrongly decrease the + // probability) + if (!value.isVertexDead() && !justChangedTimeToSwitch) { + value.setActiveNeighbors(checkMsgsAndUpdateProbability(messages, value)); + } + } + } + } + }; + } + + /** + * Check all the messages received by the vertex and update its probability with respect to the + * its threshold + * + * @param msgs the list of messages received by the vertex + * @param value the vertex value + * @return the number of neighbors using the feature (invited and active) + */ + private int checkMsgsAndUpdateProbability( + Iterable msgs, DiffusionVertexValue value) { + Iterator it = msgs.iterator(); + int activeNeighbors = 0; + while (it.hasNext()) activeNeighbors += it.next().get(); + if (activeNeighbors > value.getVertexThreshold()) value.modifyCurrentActivationProbability(1); + else if (activeNeighbors < value.getVertexThreshold()) + value.modifyCurrentActivationProbability(-1); + return activeNeighbors; + } + + /** + * Basing on the vertex current activation probability, update the relative reducers + * + * @param value the vertex value, containing the current activation probability + */ + private void aggregateVerticesBasedOnProbability(DiffusionVertexValue value) { + if (value.isVertexConvinced()) convincedVerticesHandle.reduce(new LongWritable(1)); + if (value.isVertexDead()) { // Dead aggregator update + deadVerticesHandle.reduce(new LongWritable(1)); + } + if (value.isAlmostConvinced()) { + almostConvincedVerticesHandle.reduce(new LongWritable(1)); + } + invitedVerticesHandle.reduce(new LongWritable(1)); + } + + @Override + public MessageCombiner getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return new SumMessageCombiner(IntTypeOps.INSTANCE); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java new file mode 100644 index 000000000..23e5ad311 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/AdjacencyListNoEdgeValueTextVertexOutputFormat.java @@ -0,0 +1,64 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * OutputFormat to write out the graph nodes as text, value-separated (by tabs, by default). With + * the default delimiter, a vertex is written out as: + * + *

[]+ + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ +@SuppressWarnings("rawtypes") +public class AdjacencyListNoEdgeValueTextVertexOutputFormat< + I extends WritableComparable, V extends Writable, E extends Writable> + extends TextVertexOutputFormat { + + /** Split delimiter */ + public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default split delimiter */ + public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; + + @Override + public AdjacencyListTextVertexWriter createVertexWriter(TaskAttemptContext context) { + return new AdjacencyListTextVertexWriter(); + } + + /** Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}. */ + protected class AdjacencyListTextVertexWriter extends TextVertexWriterToEachLine { + /** Cached split delimeter */ + private String delimiter; + + @Override + public void initialize(TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(context); + delimiter = getConf().get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + } + + @Override + public Text convertVertexToLine(Vertex vertex) throws IOException { + StringBuffer sb = new StringBuffer(vertex.getId().toString()); + sb.append(delimiter); + sb.append(vertex.getValue()); + sb.append(delimiter); + + for (Edge edge : vertex.getEdges()) { + sb.append(edge.getTargetVertexId()).append(","); + // sb.append(delimiter).append(edge.getValue()); + } + sb.setLength(sb.length() - 1); + return new Text(sb.toString()); + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java new file mode 100644 index 000000000..1e82ba4dc --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/DiffusionVertexInputFormat.java @@ -0,0 +1,64 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import com.google.common.collect.Lists; + +import org.apache.giraph.examples.feature_diffusion.datastructures.DiffusionVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class DiffusionVertexInputFormat + extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine { + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length - 1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected DiffusionVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + String value = split[1]; + String[] reSplit = value.split(","); + if (reSplit.length == 2) { + int treshold = Integer.parseInt(reSplit[1]); + int label = Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(treshold, label); + + } else { + int label = Integer.parseInt(reSplit[0]); + return new DiffusionVertexValue(label); + } + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java new file mode 100644 index 000000000..46e8f317f --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/io/LabelingInputFormat.java @@ -0,0 +1,59 @@ +package org.apache.giraph.examples.feature_diffusion.io; + +import com.google.common.collect.Lists; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public class LabelingInputFormat + extends TextVertexInputFormat { + + @Override + public TextVertexReader createVertexReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException { + return new DiffusionVertexReader(); + } + + protected class DiffusionVertexReader extends TextVertexReaderFromEachLine { + + @Override + protected Iterable> getEdges(Text line) throws IOException { + String[] fA = line.toString().split("\t"); + String[] edgeArray = fA[fA.length - 1].split(","); + List> edges = Lists.newArrayList(); + int i; + for (i = 0; i < edgeArray.length; ++i) { + long neighborId = Long.parseLong(edgeArray[i]); + edges.add(EdgeFactory.create(new LongWritable(neighborId), NullWritable.get())); + } + return edges; + } + + @Override + protected LongWritable getId(Text line) throws IOException { + return new LongWritable(Long.parseLong(line.toString().split("\t")[0])); + } + + @Override + protected LabelingVertexValue getValue(Text line) throws IOException { + String[] split = line.toString().split("\t"); + if (split.length == 2) { + return new LabelingVertexValue(); + } else { + String treshold = split[split.length - 2]; + return new LabelingVertexValue(Integer.parseInt(treshold)); + } + } + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java new file mode 100644 index 000000000..d313e356a --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/DegreeLabelingBlockFactory.java @@ -0,0 +1,48 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class DegreeLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, NullWritable, NullWritable> { + + long superstep = 0; + + @Override + public Block createBlock(GiraphConfiguration arg0) { + return Pieces.forAllVertices( + "Degree Labeling", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(vertex.getNumEdges()); + }); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java new file mode 100644 index 000000000..2b38e0118 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/FlatTimeLabelingBlockFactory.java @@ -0,0 +1,49 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.writable.tuple.LongLongWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class FlatTimeLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, LongLongWritable, LongLongWritable> { + + long superstep = 0; + + @Override + public Block createBlock(GiraphConfiguration arg0) { + return Pieces.forAllVertices( + "Degree Labeling", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(1); + }); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java new file mode 100644 index 000000000..ca3cc750e --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/KCoreLabelingBlockFactory.java @@ -0,0 +1,130 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.reducers.impl.AndReduce; +import org.apache.giraph.writable.tuple.LongLongWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +@SuppressWarnings("unused") +public class KCoreLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, LongLongWritable, LongLongWritable> { + + ObjectTransfer stoppingCondition; + + public static ConsumerWithVertex< + LongWritable, LabelingVertexValue, NullWritable, Iterable> + kCoreConsumer = + (vertex, messages) -> { + LabelingVertexValue value = vertex.getValue(); + for (LongLongWritable msg : messages) { + long id = msg.getLeft().get(); + int coreness = (int) msg.getRight().get(); + value.updateNeighboorLabel(id, coreness); + } + }; + + @Override + public Block createBlock(GiraphConfiguration conf) { + stoppingCondition = new ObjectTransfer(false); + return new SequenceBlock( + Pieces + . + sendMessageToNeighbors( + "KCore Labeling First run", + LongLongWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + value.setLabel(Integer.max(vertex.getNumEdges(), 1)); + value.setChanged(false); + return new LongLongWritable(vertex.getId().get(), value.getLabel()); + }, + kCoreConsumer), + new RepeatUntilBlock( + GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.get(conf), + new SequenceBlock( + Pieces + . + sendMessageToNeighbors( + "KCore Labeling", + LongLongWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + int tempLabel = + computeIndex(value.getNeighborsLabel(), value.getLabel()); + if (tempLabel < value.getLabel()) value.setLabel(tempLabel); + if (value.isChanged()) { + // value.setChanged(false); + return new LongLongWritable(vertex.getId().get(), value.getLabel()); + } + return null; + }, + kCoreConsumer), + Pieces + . + reduce( + "Reducing stopping Condition", + AndReduce.INSTANCE, + (vertex) -> { + boolean isLabelChanged = vertex.getValue().isChanged(); + vertex.getValue().setChanged(false); + return new BooleanWritable(!isLabelChanged); + }, + (value) -> { + stoppingCondition.apply(value.get()); + })), + stoppingCondition)); + } + + private int computeIndex(HashMap neighborsLabel, long coreness) { + int[] corenessCount = new int[(int) coreness]; + for (int i = 0; i < coreness; i++) corenessCount[i] = 0; + for (Entry pair : neighborsLabel.entrySet()) { + long corenessCandidate = Long.min(pair.getValue(), coreness); + corenessCount[(int) corenessCandidate - 1]++; + } + for (int i = (int) (coreness - 1); i > 0; i--) corenessCount[i - 1] += corenessCount[i]; + int i = (int) coreness; + while (i > 1 && corenessCount[i - 1] < i) { + i--; + } + return i; + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +} diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java new file mode 100644 index 000000000..2b03c1efd --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/feature_diffusion/labeling/factory/PageRankLabelingBlockFactory.java @@ -0,0 +1,84 @@ +package org.apache.giraph.examples.feature_diffusion.labeling.factory; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.TypesHolder; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import org.apache.giraph.examples.feature_diffusion.datastructures.LabelingVertexValue; + +public class PageRankLabelingBlockFactory extends AbstractBlockFactory + implements TypesHolder< + LongWritable, LabelingVertexValue, NullWritable, DoubleWritable, DoubleWritable> { + + long numVerticesTotal; + + private ConsumerWithVertex< + LongWritable, LabelingVertexValue, NullWritable, Iterable> + pageRankConsumer = + (vertex, messages) -> { + LabelingVertexValue value = vertex.getValue(); + double sum = 0; + for (DoubleWritable message : messages) { + if (message.get() >= 0) sum += message.get(); + } + double pr = ((0.15f / numVerticesTotal) + 0.85f * sum); + value.setTemp(pr); // to change, removing + }; + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock( + Pieces.masterCompute( + "Master preprocess", + (master) -> { + numVerticesTotal = master.getTotalNumVertices(); + }), + new RepeatBlock( + conf.getInt("labeling.pagerank.iterations", 49), + Pieces + . + sendMessageToNeighbors( + "PageRank labeling", + DoubleWritable.class, + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + return new DoubleWritable(value.getTemp() / vertex.getNumEdges()); + }, + pageRankConsumer)), + Pieces.forAllVertices( + "Closing PageRank", + (vertex) -> { + LabelingVertexValue value = vertex.getValue(); + int cif = (int) (Math.log10(numVerticesTotal) + 2); + value.setLabel((long) (value.getTemp() * Math.pow(10, cif))); + })); + } + + @Override + protected Class getEdgeValueClass(GiraphConfiguration arg0) { + return NullWritable.class; + } + + @Override + protected Class getVertexIDClass(GiraphConfiguration arg0) { + return LongWritable.class; + } + + @Override + protected Class getVertexValueClass(GiraphConfiguration arg0) { + return LabelingVertexValue.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration arg0) { + return new Object(); + } +}