diff --git a/.gitignore b/.gitignore index ba753fb..a28ec81 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,4 @@ packer_virtualbox-ovf_virtualbox.box **/target/* .credentials +.idea \ No newline at end of file diff --git a/README.md b/README.md index 51c6e3e..4156b2e 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,30 @@ Udacity and Twitter bring you Real-Time Analytics with Apache Storm Join the course for free: www.udacity.com/course/ud381 + + +### Serge's changes + + - updated to point to fixed apache storm repo + - added provision task for vagrant + - run or re-run vagrant provision --provision-with bootstrap - will remove old, reinstall storm + - run or re-run vagrant provision --provision-with jdk8 - this will upgrade to jdk8 and set it to default + + + - Install JDK 8 since default is 6 + - sudo apt-get install software-properties-common python-software-properties + - sudo add-apt-repository ppa:webupd8team/java + - sudo apt-get update + - sudo apt-get install oracle-java8-installer + + + + +### Environmental variables need to be set: + +===== + +export TWITTER_CKEY=... +export TWITTER_SKEY=... +export TWITTER_TOKEN=.. +export TWITTER_SECRET=... diff --git a/Vagrantfile b/Vagrantfile index bf4bca6..a5c0c46 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -7,4 +7,11 @@ VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.box = "udacity/ud381" config.vm.network :forwarded_port, guest: 5000, host: 5000 + #config.vm.provision :shell, path: "bootstrap.sh" + + + config.vm.provision 'bootstrap', type: :shell, path: "bootstrap.sh" + config.vm.provision 'jdk8', type: :shell, path: "jdk8.sh" + + end diff --git a/provision.sh b/bootstrap.sh similarity index 58% rename from provision.sh rename to bootstrap.sh index c8bdb24..36f1694 100755 --- a/provision.sh +++ b/bootstrap.sh @@ -1,4 +1,4 @@ -#!/bin/bash -i +#!/usr/bin/env bash # The following are documented (and stolen from) here: # http://redsymbol.net/articles/unofficial-bash-strict-mode/ @@ -18,12 +18,20 @@ sudo apt-get -y install default-jdk maven vim zookeeper zookeeperd redis-server sudo pip install flask redis -echo "Storm..." -# TODO maybe make this use the best mirror always? +echo "Storm.." + + +sudo rm -rf /opt/storm 2> /dev/null | echo "Removed old" +sudo rm /usr/bin/storm 2> /dev/null | echo "Removed old" + + + sudo mkdir /opt/storm cd /opt/storm -sudo wget http://mirror.cogentco.com/pub/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz -sudo tar xvzf apache-storm-0.9.2-incubating.tar.gz -sudo rm apache-storm-0.9.2-incubating.tar.gz -sudo chmod +x /opt/storm/apache-storm-0.9.2-incubating/bin/storm -sudo ln -s /opt/storm/apache-storm-0.9.2-incubating/bin/storm /usr/bin/storm + +# -q option will supress progress +sudo wget -q http://apache.mirror.vexxhost.com/storm/apache-storm-0.9.3/apache-storm-0.9.3.tar.gz +sudo tar xvzf apache-storm-0.9.3.tar.gz +sudo rm apache-storm-0.9.3.tar.gz +sudo chmod +x /opt/storm/apache-storm-0.9.3/bin/storm +sudo ln -s /opt/storm/apache-storm-0.9.3/bin/storm /usr/bin/storm diff --git a/jdk8.sh b/jdk8.sh new file mode 100644 index 0000000..6ce9038 --- /dev/null +++ b/jdk8.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + + # - Install JDK 8 since default is 6 + # - sudo apt-get install software-properties-common python-software-properties + # - sudo add-apt-repository ppa:webupd8team/java + # - sudo apt-get --yes update + # - sudo apt-get --yes --force-yes install oracle-java8-installer + # - sudo apt-get --yes --force-yes install oracle-java8-set-default + # - sudo apt-get --yes update + + + + +sudo apt-get install software-properties-common python-software-properties +sudo add-apt-repository ppa:webupd8team/java +sudo apt-get --yes update +sudo apt-get --yes --force-yes install oracle-java8-installer +sudo apt-get --yes --force-yes install oracle-java8-set-default +sudo apt-get --yes upgrade + + +echo "Done " \ No newline at end of file diff --git a/lesson1/stage1/pom.xml b/lesson1/stage1/pom.xml index a4d7881..ac08328 100644 --- a/lesson1/stage1/pom.xml +++ b/lesson1/stage1/pom.xml @@ -27,75 +27,81 @@ - junit - junit - 3.8.1 - test - - - org.testng - testng - 6.8.5 - test - - - org.mockito - mockito-all - 1.9.0 - test - - - org.easytesting - fest-assert-core - 2.0M8 - test - - - org.jmock - jmock - 2.6.0 - test - - - org.apache.storm - storm-core - 0.9.2-incubating - - provided - - - commons-collections - commons-collections - 3.2.1 - - - com.google.guava - guava - 15.0 - - - org.twitter4j - twitter4j-core - [3.0,) - - - org.twitter4j - twitter4j-stream - [3.0,) - + com.lambdaworks + lettuce + 2.3.3 + + + + junit + junit + 3.8.1 + test + + + org.testng + testng + 6.8.5 + test + + + org.mockito + mockito-all + 1.9.0 + test + + + org.easytesting + fest-assert-core + 2.0M8 + test + + + org.jmock + jmock + 2.6.0 + test + + + org.apache.storm + storm-core + 0.9.3 + + provided + + + commons-collections + commons-collections + 3.2.1 + + + com.google.guava + guava + 15.0 + + + org.twitter4j + twitter4j-core + [3.0,) + + + org.twitter4j + twitter4j-stream + [3.0,) + - + --> + - - src/jvm - + + src/jvm + + --> maven-assembly-plugin @@ -172,11 +178,11 @@ maven-compiler-plugin 3.1 - + 1.7 - 1.7 + 1.8 - - + + diff --git a/lesson1/stage1/src/jvm/udacity/storm/ReporterExclamationTopology.java b/lesson1/stage1/src/jvm/udacity/storm/ReporterExclamationTopology.java index 98f4c38..a6d4e95 100644 --- a/lesson1/stage1/src/jvm/udacity/storm/ReporterExclamationTopology.java +++ b/lesson1/stage1/src/jvm/udacity/storm/ReporterExclamationTopology.java @@ -16,16 +16,23 @@ import java.util.Map; + +import com.lambdaworks.redis.RedisClient; + +import com.lambdaworks.redis.RedisConnection; + + + //********* TO DO 1-of-4 imported http://mvnrepository.com/artifact/com.lambdaworks/lettuce/ // COPY AND PASE: following code into pom.xml file (located lesson1/stage1/pom.xml) -// +// // com.lambdaworks // lettuce // 2.3.3 -// -// +// + //********* END 1-of-4 /** @@ -54,7 +61,7 @@ public static class ExclamationBolt extends BaseRichBolt //********* TO DO 2-of-4 // place holder to keep the connection to redis - + RedisConnection redis; //********* END 2-of-4 @Override @@ -68,9 +75,9 @@ public void prepare( //********* TO DO 3-of-4 // instantiate a redis connection - + RedisClient client = new RedisClient("localhost",6379); // initiate the actual connection - + redis = client.connect(); //********* END 3-of-4 } @@ -88,8 +95,8 @@ public void execute(Tuple tuple) _collector.emit(tuple, new Values(exclamatedWord.toString())); //********* TO DO 4-of-4 Uncomment redis reporter - //long count = 30; - //redis.publish("WordCountTopology", exclamatedWord.toString() + "|" + Long.toString(count)); + long count = 30; + redis.publish("WordCountTopology", exclamatedWord.toString() + "|" + Long.toString(count)); //********* END 4-of-4 } diff --git a/lesson1/stage2/pom.xml b/lesson1/stage2/pom.xml index b98789c..c15f459 100644 --- a/lesson1/stage2/pom.xml +++ b/lesson1/stage2/pom.xml @@ -23,9 +23,21 @@ clojars.org http://clojars.org/repo + + multiline-release-repo + https://raw.github.com/benelog/multiline/master/maven-repository + + false + + + + org.adrianwalker + multiline-string + 0.1.1 + junit junit @@ -59,7 +71,7 @@ org.apache.storm storm-core - 0.9.2-incubating + 0.9.3 provided @@ -85,22 +97,22 @@ - - com.lambdaworks - lettuce - 2.3.3 - - + --> + + com.lambdaworks + lettuce + 2.3.3 + + - - src/jvm - + + src/jvm + + --> maven-assembly-plugin @@ -177,11 +189,11 @@ maven-compiler-plugin 3.1 - + 1.7 1.7 - - + + diff --git a/lesson1/stage2/src/jvm/udacity/storm/ReporterExclamationTopology.java b/lesson1/stage2/src/jvm/udacity/storm/ReporterExclamationTopology.java index 5692d6c..9e43355 100644 --- a/lesson1/stage2/src/jvm/udacity/storm/ReporterExclamationTopology.java +++ b/lesson1/stage2/src/jvm/udacity/storm/ReporterExclamationTopology.java @@ -20,17 +20,7 @@ import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisConnection; -// COPY AND PASE: following code into pom.xml file (located lesson1/stage1/pom.xml) -// -// com.lambdaworks -// lettuce -// 2.3.3 -// -// -//********* END 1-of-4 - -//********* BEGIN stage2 exercise part 1-of-2 *********** -//import the random sentence spout from spout/RandomSentenceSpout (remember the semicolon!) +import udacity.storm.spout.RandomSentenceSpout; //********** END stage 2 exercise part 1-of-2 *********** @@ -117,13 +107,15 @@ public static void main(String[] args) throws Exception //********* BEGIN stage2 exercise part 2-of-2 *********** // attach the word spout to the topology - parallelism of 10 - builder.setSpout("word", new TestWordSpout(), 10); + //builder.setSpout("word", new TestWordSpout(), 10); + + builder.setSpout("rand-setence", new RandomSentenceSpout(),10); // attach the exclamation bolt to the topology - parallelism of 3 - builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); + builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("rand-setence"); // attach another exclamation bolt to the topology - parallelism of 2 - builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); + //builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); //********* END stage2 exercise part 2-of-2 *********** diff --git a/lesson1/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java b/lesson1/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java index 17d12bb..6d9881d 100644 --- a/lesson1/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java +++ b/lesson1/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java @@ -10,29 +10,65 @@ import java.util.Map; import java.util.Random; +import org.adrianwalker.multilinestring.Multiline; public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; + /** +Help, I need somebody +Help, not just anybody +Help, you know I need someone, help + +When I was younger (So much younger than) so much younger than today +(I never needed) I never needed anybody's help in any way +(Now) But now these days are gone (These days are gone), I'm not so self assured +(I know I've found) Now I find I've changed my mind and opened up the doors + +Help me if you can, I'm feeling down +And I do appreciate you being 'round +Help me get my feet back on the ground +Won't you please, please help me + +(Now) And now my life has changed in oh so many ways +(My independence) My independence seems to vanish in the haze +(But) But every now (Every now and then) and then I feel so insecure +(I know that I) I know that I just need you like I've never done before + +Help me if you can, I'm feeling down +And I do appreciate you being 'round +Help me get my feet back on the ground +Won't you please, please help me + +When I was younger so much younger than today +I never needed anybody's help in any way +(But) But now these days are gone (These days are gone), I'm not so self assured +(I know I've found) Now I find I've changed my mind and opened up the doors + +Help me if you can, I'm feeling down +And I do appreciate you being round +Help me, get my feet back on the ground +Won't you please, please help me, help me, help me, ooh + + + */ + @Multiline private static String _beat_help_song; + + private String[] _parsed_song; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); + _parsed_song = _beat_help_song.split("[\\r\\n]+"); } @Override public void nextTuple() { Utils.sleep(100); - String[] sentences = new String[]{ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - }; - String sentence = sentences[_rand.nextInt(sentences.length)]; + + String sentence = _parsed_song[_rand.nextInt(_parsed_song.length)]; _collector.emit(new Values(sentence)); } diff --git a/lesson2/stage1/pom.xml b/lesson2/stage1/pom.xml index e571673..fe1f1bf 100644 --- a/lesson2/stage1/pom.xml +++ b/lesson2/stage1/pom.xml @@ -59,7 +59,7 @@ org.apache.storm storm-core - 0.9.2-incubating + 0.9.3 provided diff --git a/lesson2/stage1/src/jvm/udacity/storm/WordCountTopology.java b/lesson2/stage1/src/jvm/udacity/storm/WordCountTopology.java index c81bb8c..68e4d2b 100644 --- a/lesson2/stage1/src/jvm/udacity/storm/WordCountTopology.java +++ b/lesson2/stage1/src/jvm/udacity/storm/WordCountTopology.java @@ -124,23 +124,22 @@ public void prepare( @Override public void execute(Tuple tuple) { - //************************************************** - //BEGIN YOUR CODE - Part 1-of-3 - //Check if incoming word is in countMap. If word does not - //exist then add word with count = 1, if word exist then - //increment count. - //Syntax to get the word from the 1st column of incoming tuple - //String word = tuple.getString(0); + String word = tuple.getString(0); + if (!countMap.containsKey(word)){ + //increment and add + countMap.put(word, 1); + }else{ + countMap.put(word, countMap.get(word) +1); + } //After countMap is updated, emit word and count to output collector // Syntax to emit the word and count (uncomment to emit) - //collector.emit(new Values(word, countMap.get(word))); + collector.emit(new Values(word, countMap.get(word))); - //END YOUR CODE Part 1-of-3 - //*************************************************** + } @Override @@ -151,14 +150,9 @@ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) // declare the first column 'word', second colmun 'count' - //**************************************************** - //BEGIN YOUR CODE - Part 2-of-3 - //uncomment line below to declare output + outputFieldsDeclarer.declare(new Fields("word","count")); - //outputFieldsDeclarer.declare(new Fields("word","count")); - //END YOUR CODE - Part 2-of-3 - //**************************************************** } } @@ -213,14 +207,8 @@ public static void main(String[] args) throws Exception // attach the count bolt using fields grouping - parallelism of 15 builder.setBolt("count-bolt", new CountBolt(), 15).fieldsGrouping("word-spout", new Fields("word")); - // attach the report bolt using global grouping - parallelism of 1 - //*************************************************** - // BEGIN YOUR CODE - Part 3-of-3 - - - - // END YOUR CODE - Part 3-of-3 - //*************************************************** + + builder.setBolt("report-bolt", new ReportBolt(), 1).globalGrouping("count-bolt"); // create the default config object Config conf = new Config(); diff --git a/lesson2/stage2/pom.xml b/lesson2/stage2/pom.xml index 264ff31..fa6e26c 100644 --- a/lesson2/stage2/pom.xml +++ b/lesson2/stage2/pom.xml @@ -59,7 +59,7 @@ org.apache.storm storm-core - 0.9.2-incubating + 0.9.3 provided @@ -175,8 +175,8 @@ 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/lesson2/stage2/src/jvm/udacity/storm/SentenceCountTopology.java b/lesson2/stage2/src/jvm/udacity/storm/SentenceCountTopology.java new file mode 100644 index 0000000..38b80db --- /dev/null +++ b/lesson2/stage2/src/jvm/udacity/storm/SentenceCountTopology.java @@ -0,0 +1,278 @@ +package udacity.storm; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.task.ShellBolt; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import backtype.storm.utils.Utils; + +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import com.lambdaworks.redis.RedisClient; +import com.lambdaworks.redis.RedisConnection; + +import udacity.storm.spout.RandomSentenceSpout; + +/** + * This topology demonstrates how to count distinct sentences from + * a stream . + * + * This is an example for Udacity Real Time Analytics Course - ud381 + * + */ +public class SentenceCountTopology { + + /** + * Constructor - does nothing + */ + private SentenceCountTopology() { } + + /** + * A spout that emits a random word + */ + static class WordSpout extends BaseRichSpout { + + // Random number generator + private Random rnd; + + // To output tuples from spout to the next stage + private SpoutOutputCollector collector; + + // For storing the list of words to be fed into the topology + private String[] wordList; + + @Override + public void open( + Map map, + TopologyContext topologyContext, + SpoutOutputCollector spoutOutputCollector) + { + + // initialize the random number generator + rnd = new Random(31); + + // save the output collector for emitting tuples + collector = spoutOutputCollector; + + // initialize a set of words + wordList = new String[]{"Jack", "Mary", "Jill", "McDonald"}; + } + + @Override + public void nextTuple() + { + // sleep a second before emitting any word + Utils.sleep(1000); + + // generate a random number based on the wordList length + int nextInt = rnd.nextInt(wordList.length); + + // emit the word chosen by the random number from wordList + collector.emit(new Values(wordList[nextInt])); + } + + @Override + public void declareOutputFields( + OutputFieldsDeclarer outputFieldsDeclarer) + { + // tell storm the schema of the output tuple for this spout + // tuple consists of a single column called 'word' + outputFieldsDeclarer.declare(new Fields("word")); + } + } + + /** + * A bolt that counts the words that it receives + */ + static class CountBolt extends BaseRichBolt { + + // To output tuples from this bolt to the next stage bolts, if any + private OutputCollector collector; + + // Map to store the count of the words + private Map countMap; + + @Override + public void prepare( + Map map, + TopologyContext topologyContext, + OutputCollector outputCollector) + { + + // save the collector for emitting tuples + collector = outputCollector; + + // create and initialize the map + countMap = new HashMap(); + } + + @Override + public void execute(Tuple tuple) + { + //************************************************** + //BEGIN YOUR CODE - Part 1-of-3 + //Check if incoming word is in countMap. If word does not + //exist then add word with count = 1, if word exist then + //increment count. + + //Syntax to get the word from the 1st column of incoming tuple + String sentence = tuple.getString(0); + + // check if the word is present in the map + if (countMap.get(sentence) == null) { + + // not present, add the word with a count of 1 + countMap.put(sentence, 1); + } else { + + // already there, hence get the count + Integer val = countMap.get(sentence); + + // increment the count and save it to the map + countMap.put(sentence, ++val); + } + + //After countMap is updated, emit word and count to output collector + // Syntax to emit the word and count (uncomment to emit) + collector.emit(new Values(sentence, countMap.get(sentence))); + + //END YOUR CODE Part 1-of-3 + //*************************************************** + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + // tell storm the schema of the output tuple for this spout + // tuple consists of a two columns called 'word' and 'count' + + // declare the first column 'word', second colmun 'count' + + //**************************************************** + //BEGIN YOUR CODE - Part 2-of-3 + //uncomment line below to declare output + + outputFieldsDeclarer.declare(new Fields("sentence","count")); + + //END YOUR CODE Part 2-of-3 + //**************************************************** + } + } + + /** + * A bolt that prints the word and count to redis + */ + static class ReportBolt extends BaseRichBolt + { + // place holder to keep the connection to redis + transient RedisConnection redis; + + @Override + public void prepare( + Map map, + TopologyContext topologyContext, + OutputCollector outputCollector) + { + // instantiate a redis connection + RedisClient client = new RedisClient("localhost",6379); + + // initiate the actual connection + redis = client.connect(); + } + + @Override + public void execute(Tuple tuple) + { + // access the first column 'word' + String sentence = tuple.getStringByField("sentence"); + + // access the second column 'count' + Integer count = tuple.getIntegerByField("count"); + + // publish the word count to redis using word as the key + redis.publish("WordCountTopology", sentence + "|" + Long.toString(count)); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) + { + // nothing to add - since it is the final bolt + } + } + + public static void main(String[] args) throws Exception + { + // create the topology + TopologyBuilder builder = new TopologyBuilder(); + + // attach the word spout to the topology - parallelism of 5 + builder.setSpout("sentence-spout", new RandomSentenceSpout(), 5); + + // attach the count bolt using fields grouping - parallelism of 15 + builder.setBolt("count-bolt", new CountBolt(), 15).fieldsGrouping("sentence-spout", new Fields("sentence")); + + // attach the report bolt using global grouping - parallelism of 1 + //*************************************************** + // BEGIN YOUR CODE - Part 3-of-3 + + builder.setBolt("report-bolt", new ReportBolt(), 1).globalGrouping("count-bolt"); + + + // END YOUR CODE Part 3-of-3 + //*************************************************** + + // create the default config object + Config conf = new Config(); + + // set the config in debugging mode + conf.setDebug(true); + + if (args != null && args.length > 0) { + + // run it in a live cluster + + // set the number of workers for running all spout and bolt tasks + conf.setNumWorkers(3); + + // create the topology and submit with config + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + + } else { + + // run it in a simulated local cluster + + // set the number of threads to run - similar to setting number of workers in live cluster + conf.setMaxTaskParallelism(3); + + // create the local cluster instance + LocalCluster cluster = new LocalCluster(); + + // submit the topology to the local cluster + cluster.submitTopology("word-count", conf, builder.createTopology()); + + //********************************************************************** + // let the topology run for 30 seconds. note topologies never terminate! + Thread.sleep(30000); + //********************************************************************** + + // we are done, so shutdown the local cluster + cluster.shutdown(); + } + } +} diff --git a/lesson2/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java b/lesson2/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java index 08a60f2..c50fdbf 100644 --- a/lesson2/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java +++ b/lesson2/stage2/src/jvm/udacity/storm/spout/RandomSentenceSpout.java @@ -26,13 +26,15 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ - "the cow jumped over the moon", + "»ßä䌜šƒ", "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" }; + "Œœ seven years ago", + "snow white and the seven ßää", + "i am at two with äŒ" }; String sentence = sentences[_rand.nextInt(sentences.length)]; - _collector.emit(new Values(sentence)); + //TODO id like to shuffle the string and break sentence in small chunks + + _collector.emit(new Values(sentence)); } @Override diff --git a/lesson2/stage3/pom.xml b/lesson2/stage3/pom.xml index ec3ee8d..a5859c2 100644 --- a/lesson2/stage3/pom.xml +++ b/lesson2/stage3/pom.xml @@ -175,8 +175,8 @@ 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/lesson2/stage3/src/jvm/udacity/storm/SentenceWordCountTopology.java b/lesson2/stage3/src/jvm/udacity/storm/SentenceWordCountTopology.java new file mode 100644 index 0000000..d5fef00 --- /dev/null +++ b/lesson2/stage3/src/jvm/udacity/storm/SentenceWordCountTopology.java @@ -0,0 +1,292 @@ +package udacity.storm; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.task.ShellBolt; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import backtype.storm.utils.Utils; + +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import com.lambdaworks.redis.RedisClient; +import com.lambdaworks.redis.RedisConnection; + +//********* TO DO part 1-of-2 import RandomSentenceSpout similar to lesson 1 +import udacity.storm.spout.RandomSentenceSpout; +//********* END TO DO part 1-of-2 +/** + * This topology demonstrates how to count distinct words from + * a stream of words. + * + * This is an example for Udacity Real Time Analytics Course - ud381 + * + */ +public class SentenceWordCountTopology { + + /** + * Constructor - does nothing + */ + + //Note: Constructor must match class name + private SentenceWordCountTopology() { } + + /** + * A spout that emits a random word + */ + static class WordSpout extends BaseRichSpout { + + // Random number generator + private Random rnd; + + // To output tuples from spout to the next stage + private SpoutOutputCollector collector; + + // For storing the list of words to be fed into the topology + private String[] wordList; + + @Override + public void open( + Map map, + TopologyContext topologyContext, + SpoutOutputCollector spoutOutputCollector) + { + + // initialize the random number generator + rnd = new Random(31); + + // save the output collector for emitting tuples + collector = spoutOutputCollector; + + // initialize a set of words + wordList = new String[]{"Jack", "Mary", "Jill", "McDonald"}; + } + + @Override + public void nextTuple() + { + // sleep a second before emitting any word + Utils.sleep(1000); + + // generate a random number based on the wordList length + int nextInt = rnd.nextInt(wordList.length); + + // emit the word chosen by the random number from wordList + collector.emit(new Values(wordList[nextInt])); + } + + @Override + public void declareOutputFields( + OutputFieldsDeclarer outputFieldsDeclarer) + { + // tell storm the schema of the output tuple for this spout + // tuple consists of a single column called 'word' + outputFieldsDeclarer.declare(new Fields("word")); + } + } + + /** + * A bolt that counts the words that it receives + */ + static class CountBolt extends BaseRichBolt { + + // To output tuples from this bolt to the next stage bolts, if any + private OutputCollector collector; + + // Map to store the count of the words + private Map countMap; + + @Override + public void prepare( + Map map, + TopologyContext topologyContext, + OutputCollector outputCollector) + { + + // save the collector for emitting tuples + collector = outputCollector; + + // create and initialize the map + countMap = new HashMap(); + } + + @Override + public void execute(Tuple tuple) + { + //************************************************** + //BEGIN YOUR CODE - Part 1a + //Check if incoming word is in countMap. If word does not + //exist then add word with count = 1, if word exist then + //increment count. + + //Syntax to get the word from the 1st column of incoming tuple + String word = tuple.getString(0); + + // check if the word is present in the map + if (countMap.get(word) == null) { + + // not present, add the word with a count of 1 + countMap.put(word, 1); + } else { + + // already there, hence get the count + Integer val = countMap.get(word); + + // increment the count and save it to the map + countMap.put(word, ++val); + } + + //After countMap is updated, emit word and count to output collector + // Syntax to emit the word and count (uncomment to emit) + collector.emit(new Values(word, countMap.get(word))); + + //END YOUR CODE + //*************************************************** + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + // tell storm the schema of the output tuple for this spout + // tuple consists of a two columns called 'word' and 'count' + + // declare the first column 'word', second colmun 'count' + + //**************************************************** + //BEGIN YOUR CODE - part 1b + //uncomment line below to declare output + + outputFieldsDeclarer.declare(new Fields("word","count")); + + //END YOUR CODE + //**************************************************** + } + } + + /** + * A bolt that prints the word and count to redis + */ + static class ReportBolt extends BaseRichBolt + { + // place holder to keep the connection to redis + transient RedisConnection redis; + + @Override + public void prepare( + Map map, + TopologyContext topologyContext, + OutputCollector outputCollector) + { + // instantiate a redis connection + RedisClient client = new RedisClient("localhost",6379); + + // initiate the actual connection + redis = client.connect(); + } + + @Override + public void execute(Tuple tuple) + { + // access the first column 'word' + String word = tuple.getStringByField("word"); + + // access the second column 'count' + Integer count = tuple.getIntegerByField("count"); + + // publish the word count to redis using word as the key + redis.publish("WordCountTopology", word + "|" + Long.toString(count)); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) + { + // nothing to add - since it is the final bolt + } + } + + public static void main(String[] args) throws Exception + { + // create the topology + TopologyBuilder builder = new TopologyBuilder(); + + //***** TO DO part 2-of-2 remove WordSpout and change to RandomSentenceSpout + + // attach the word spout to the topology - parallelism of 5 + //builder.setSpout("word-spout", new WordSpout(), 5); + + // attach sentence spout to the topology - parallelism of 1 + builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1); + + + builder.setBolt("split-sentence-bolt", new SplitSentenceBolt(), 5).shuffleGrouping("sentence-spout"); + + // attach the count bolt using fields grouping - parallelism of 15 + builder.setBolt("count-bolt", new CountBolt(), 15).fieldsGrouping("split-sentence-bolt", new Fields("words")); + + //***** END part 2-of-2 remove************************************* + + // attach the report bolt using global grouping - parallelism of 1 + //*************************************************** + // BEGIN YOUR CODE - part 2 + + builder.setBolt("report-bolt", new ReportBolt(), 1).globalGrouping("count-bolt"); + + + // END YOUR CODE + //*************************************************** + + // create the default config object + Config conf = new Config(); + + // set the config in debugging mode + conf.setDebug(true); + + if (args != null && args.length > 0) { + + // run it in a live cluster + + // set the number of workers for running all spout and bolt tasks + conf.setNumWorkers(3); + + // create the topology and submit with config + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + + } else { + + // run it in a simulated local cluster + + // set the number of threads to run - similar to setting number of workers in live cluster + conf.setMaxTaskParallelism(3); + + // create the local cluster instance + LocalCluster cluster = new LocalCluster(); + + // submit the topology to the local cluster + // name topology + cluster.submitTopology("sentence-count", conf, builder.createTopology()); + + //********************************************************************** + // let the topology run for 30 seconds. note topologies never terminate! + Thread.sleep(30000); + //********************************************************************** + + // we are done, so shutdown the local cluster + cluster.shutdown(); + } + } +} diff --git a/lesson2/stage3/src/jvm/udacity/storm/SplitSentenceBolt.java b/lesson2/stage3/src/jvm/udacity/storm/SplitSentenceBolt.java new file mode 100644 index 0000000..a9ca789 --- /dev/null +++ b/lesson2/stage3/src/jvm/udacity/storm/SplitSentenceBolt.java @@ -0,0 +1,64 @@ + +package udacity.storm; + +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.task.OutputCollector; +import java.util.Map; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.task.TopologyContext; +import java.util.Arrays; +public class SplitSentenceBolt extends BaseRichBolt { + +// To output tuples from this bolt to the next stage bolts, if any + private OutputCollector collector; + + public void SplitSentenceBolt(){ + + } + + @Override + public void prepare( + Map map, + TopologyContext topologyContext, + OutputCollector outputCollector) + { + + // save the collector for emitting tuples + collector = outputCollector; + + + } + + @Override + public void execute(Tuple tuple) + { + //************************************************** + + + //Syntax to get the word from the 1st column of incoming tuple + String sentence = tuple.getString(0); + + //split sentence into words + if (sentence != null){ + String delims = "[ .,?!]+"; + Arrays.asList(sentence.split(delims)).parallelStream().forEach(word -> collector.emit(new Values(word))); + } + + + + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + + + outputFieldsDeclarer.declare(new Fields("words")); + + } + +} \ No newline at end of file diff --git a/lesson2/stage3/src/jvm/udacity/storm/spout/RandomSentenceSpout.java b/lesson2/stage3/src/jvm/udacity/storm/spout/RandomSentenceSpout.java index 17d12bb..bbc5fb9 100644 --- a/lesson2/stage3/src/jvm/udacity/storm/spout/RandomSentenceSpout.java +++ b/lesson2/stage3/src/jvm/udacity/storm/spout/RandomSentenceSpout.java @@ -26,12 +26,10 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - }; + "do not go gentle into that good night", + "old age should burn and rave at close of day", + "rage, rage against the dying of the light" + }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } diff --git a/lesson2/stage4/src/jvm/udacity/storm/spout/RandomSentenceSpout.java b/lesson2/stage4/src/jvm/udacity/storm/spout/RandomSentenceSpout.java index 17d12bb..6dda296 100644 --- a/lesson2/stage4/src/jvm/udacity/storm/spout/RandomSentenceSpout.java +++ b/lesson2/stage4/src/jvm/udacity/storm/spout/RandomSentenceSpout.java @@ -26,11 +26,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" + "do not go gentle into that good night", + "old age should burn and rave at close of day", + "rage, rage against the dying of the light" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); diff --git a/lesson2/stage5/pom.xml b/lesson2/stage5/pom.xml index 5f11849..1785c8d 100644 --- a/lesson2/stage5/pom.xml +++ b/lesson2/stage5/pom.xml @@ -175,8 +175,8 @@ 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/lesson2/stage5/src/jvm/udacity/storm/TweetTopology.java b/lesson2/stage5/src/jvm/udacity/storm/TweetTopology.java index 1278cfd..1511d5a 100644 --- a/lesson2/stage5/src/jvm/udacity/storm/TweetTopology.java +++ b/lesson2/stage5/src/jvm/udacity/storm/TweetTopology.java @@ -353,10 +353,10 @@ public static void main(String[] args) throws Exception // now create the tweet spout with the credentials TweetSpout tweetSpout = new TweetSpout( - //"[Your customer key]", - //"[Your secret key]", - //"[Your access token]", - //"[Your access secret]" + System.getenv().get("TWITTER_CKEY"), + System.getenv().get("TWITTER_SKEY"), + System.getenv().get("TWITTER_TOKEN"), + System.getenv().get("TWITTER_SECRET") ); // attach the tweet spout to the topology - parallelism of 1 @@ -369,6 +369,13 @@ public static void main(String[] args) throws Exception // Part 3: attach the report bolt, parallelism of 1 (what grouping is needed?) // Submit and run the topology. + builder.setBolt("parse-tweet", new ParseTweetBolt(), 10).shuffleGrouping("tweet-spout"); + builder.setBolt("count-bolt", new CountBolt(), 15).fieldsGrouping("parse-tweet", new Fields("tweet-word")); + builder.setBolt("report-bolt", new ReportBolt(), 1).globalGrouping("count-bolt"); + + + + //********************************************************************* diff --git a/lesson2/stage6/src/jvm/udacity/storm/TweetTopology.java b/lesson2/stage6/src/jvm/udacity/storm/TweetTopology.java index 7e26b8a..e7bf71c 100644 --- a/lesson2/stage6/src/jvm/udacity/storm/TweetTopology.java +++ b/lesson2/stage6/src/jvm/udacity/storm/TweetTopology.java @@ -32,12 +32,13 @@ public static void main(String[] args) throws Exception * */ + // now create the tweet spout with the credentials TweetSpout tweetSpout = new TweetSpout( - "[Your customer key]", - "[Your secret key]", - "[Your access token]", - "[Your access secret]" + System.getenv().get("TWITTER_CKEY"), + System.getenv().get("TWITTER_SKEY"), + System.getenv().get("TWITTER_TOKEN"), + System.getenv().get("TWITTER_SECRET") ); //********************************************************************* @@ -49,6 +50,20 @@ public static void main(String[] args) throws Exception // Submit and run the topology. + + // attach the tweet spout to the topology - parallelism of 1 + builder.setSpout("tweet-spout", tweetSpout, 1); + + // attach the parse tweet bolt using shuffle grouping + builder.setBolt("parse-tweet-bolt", new ParseTweetBolt(), 10).shuffleGrouping("tweet-spout"); + + // attach the count bolt using fields grouping - parallelism of 15 + builder.setBolt("count-bolt", new CountBolt(), 15).fieldsGrouping("parse-tweet-bolt", new Fields("tweet-word")); + + // attach the report bolt using global grouping - parallelism of 1 + builder.setBolt("report-bolt", new ReportBolt(), 1).globalGrouping("count-bolt"); + + //********************************************************************* // create the default config object diff --git a/viz/rt-provision-32.sh b/viz/rt-provision-32.sh index 3b09fbd..d8af5a9 100755 --- a/viz/rt-provision-32.sh +++ b/viz/rt-provision-32.sh @@ -5,14 +5,6 @@ echo "Real-Time Provisioning...." echo "Java JDK..." sudo apt-get install default-jdk -y -echo "Storm..." -#sudo wget http://apache.spinellicreations.com/incubator/storm/apache-storm-0.9.1-incubating/apache-storm-0.9.1-incubating.zip -#sudo unzip -o /media/sf_VirtualBoxUbuntuShared/apache-storm-0.9.1-incubating.zip -sudo wget http://www.trieuvan.com/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.zip -sudo unzip -o $(pwd)/apache-storm-0.9.2-incubating.zip -# use storm.0.9.2 for now...confirming with Twitter -sudo ln -s $(pwd)/apache-storm-0.9.2-incubating/ /usr/share/storm -sudo ln -s /usr/share/storm/bin/storm /usr/bin/storm echo "Lein..." sudo wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein diff --git a/viz/static/MyPicture.jpg b/viz/static/MyPicture.jpg new file mode 100644 index 0000000..4050107 Binary files /dev/null and b/viz/static/MyPicture.jpg differ diff --git a/viz/templates/cloud.html b/viz/templates/cloud.html index 3c69d58..91f49bc 100644 --- a/viz/templates/cloud.html +++ b/viz/templates/cloud.html @@ -40,7 +40,9 @@

Udacity and Twitter bring you Real-Time Analytics with

 Smiley face  Twitter logo -   Word Cloud!!!

+  Serge Profile +
+    Serge's Word Cloud!!!