Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest code #5

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="lib" path="target/storm-hackathon-1.0-SNAPSHOT-jar-with-dependencies.jar"/>
<classpathentry kind="lib" path="target/storm-hackathon-1.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/asm-4.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/carbonite-1.5.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/clj-time-0.4.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/clojure-1.4.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/clout-1.0.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-codec-1.4.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-exec-1.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-fileupload-1.2.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-io-1.4.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-lang-2.5.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/commons-logging-1.1.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/compojure-1.1.3.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/core.incubator-0.1.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/curator-client-1.0.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/curator-framework-1.0.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/disruptor-2.10.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/guava-13.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/hiccup-0.3.6.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/httpclient-4.1.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/httpcore-4.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/jetty-6.1.26.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/jetty-util-6.1.26.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/jgrapht-0.8.3.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/jline-0.9.94.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/joda-time-2.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/json-simple-1.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/junit-3.8.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/jzmq-2.1.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/kryo-2.17.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/libthrift7-0.7.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/log4j-over-slf4j-1.6.6.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/logback-classic-1.0.6.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/logback-core-1.0.6.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/math.numeric-tower-0.0.1.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/minlog-1.2.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/objenesis-1.2.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/reflectasm-1.07-shaded.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/ring-core-1.1.5.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/ring-jetty-adapter-0.3.11.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/ring-servlet-0.3.11.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/servlet-api-2.5-20081211.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/servlet-api-2.5.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/slf4j-api-1.6.5.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/snakeyaml-1.9.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/tools.cli-0.2.2.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/tools.logging-0.2.3.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/tools.macro-0.1.0.jar"/>
<classpathentry kind="lib" path="tools/storm/lib/zookeeper-3.3.3.jar"/>
<classpathentry kind="lib" path="tools/storm/storm-0.9.0-wip16.jar"/>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
23 changes: 23 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>storm-hackathon</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
4 changes: 4 additions & 0 deletions .settings/org.eclipse.core.resources.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding/<project>=UTF-8
5 changes: 5 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.6
4 changes: 4 additions & 0 deletions .settings/org.eclipse.m2e.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
24 changes: 20 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,26 @@
</dependency>

<dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>2.3</version>
</dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>2.3</version>
</dependency>

<dependency>
<groupId>oauth.signpost</groupId>
<artifactId>signpost-core</artifactId>
<version>1.2.1.2</version>
</dependency>
<dependency>
<groupId>oauth.signpost</groupId>
<artifactId>signpost-commonshttp4</artifactId>
<version>1.2.1.2</version>
</dependency>
<dependency>
<groupId>com.github.pmerienne</groupId>
<artifactId>trident-ml</artifactId>
<version>0.0.2</version>
</dependency>

<dependency>
<groupId>storm</groupId>
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/hackreduce/storm/CsvSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void nextTuple() {

@Override
public void ack(Object id) {
//This is a test
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.hackreduce.storm.vmc.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import org.hackreduce.storm.vmc.common.BaseComponent;


import java.util.Map;

/**
* Created with IntelliJ IDEA.
* User: Richard Gu
* Date: 7/19/13
* Time: 2:26 PM
*/
public class TwitterContentFilter extends BaseComponent implements IRichBolt {

private OutputCollector collector;

private TwitterSentimentAnalyzer classifier;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.conf = conf;
this.classifier = new TwitterSentimentAnalyzer();
}

@Override
public void execute(Tuple input) {

String line = input.getValueByField("line").toString();
boolean happy = classifier.classify(line);
System.out.println(happy + " *** " + line);

}

@Override
public void cleanup() {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line", "content"));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.hackreduce.storm.vmc.bolts;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

/**
* Created with IntelliJ IDEA.
* User: Richard Gu
* Date: 7/19/13
* Time: 3:56 PM
*/


public class TwitterSentimentAnalyzer extends TwitterSentimentClassifier {

public TwitterSentimentAnalyzer() {
super();
}

public Boolean classify(String text) {
return super.classify(text);
}

}
110 changes: 110 additions & 0 deletions src/main/java/org/hackreduce/storm/vmc/spouts/TwitterStreamSpout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.hackreduce.storm.vmc.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;


import oauth.signpost.OAuthConsumer;
import oauth.signpost.commonshttp.CommonsHttpOAuthConsumer;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;

/**
* Created with IntelliJ IDEA.
* User: Richard Gu
* Date: 7/19/13
* Time: 2:05 PM
*/


public class TwitterStreamSpout extends BaseSpout {

private static final String consumerKey = "bW7WjFbtGBmTri2NmzQ";
private static final String consumerSecret = "N1bz4Sncho5X8Fipa85GBFveKecrxRINQh3iFqWaEg";

private static final String accessToken = "15270429-EyvSABg3Pv3ooNZORAQmafZuMo0myLMTM6viWoSVj";
private static final String accessSecret = "V9DJBuU3S1S04Ne9sJPFEKzQRwKWXGuuc5OAk3RQZk";

private static final String feedUrl = "https://stream.twitter.com/1.1/statuses/sample.json";


private OAuthConsumer consumer;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
super.open(conf, context, collector);
this.setup();
}

@Override
public void close() {
super.close();
}

private void setup() {
consumer = new CommonsHttpOAuthConsumer(consumerKey, consumerSecret);
consumer.setTokenWithSecret(accessToken, accessSecret);
}


@Override
public void nextTuple() {

try {

HttpGet request = new HttpGet(feedUrl);
consumer.sign(request);

DefaultHttpClient client = new DefaultHttpClient();
HttpResponse response = client.execute(request);
BufferedReader reader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));

while (true) {
String line = reader.readLine();
if (line == null)
break;
else {
this.collector.emit(new Values(line), line);
}
}

} catch (Exception ex) {
ex.printStackTrace();
}

System.out.println("Sleeping before reconnect...");

try {
Thread.sleep(15000);
} catch (Exception e) {
}


}

@Override
public void ack(Object obj) {
super.ack(obj);
}

@Override
public void fail(Object obj) {
super.fail(obj);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.hackreduce.storm.vmc.topologies;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import org.hackreduce.storm.vmc.bolts.TwitterContentFilter;
import org.hackreduce.storm.vmc.spouts.TwitterStreamSpout;


/**
* Created with IntelliJ IDEA.
* User: Richard Gu
* Date: 7/19/13
* Time: 2:04 PM
*/

public class TwitterFeedTopology {

public static void main(String[] args) throws InterruptedException {

TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("twitterSpout", new TwitterStreamSpout(), 2).setNumTasks(3);
//route all input to one bolt so they can be counted
tb.setBolt("twitterFilterBolt", new TwitterContentFilter(), 1).globalGrouping("twitterSpout");

Config cfg = new Config();
cfg.setDebug(false);

LocalCluster lc = new LocalCluster();
lc.submitTopology("Twitter Topology", cfg, tb.createTopology());
Thread.sleep(60000);
lc.shutdown();

}


}

Large diffs are not rendered by default.

Large diffs are not rendered by default.