Introduction
I was searching for a (comparatively) simple use case of Apache Spark and finally found an interesting scenario. The plan is to analyze Tweets related to the US elections 2016 in real time to provide live feedback during the TV debates.For this purpose we are going to set up an Elasticsearch server and feed it with tweets using Apache Spark. In the process we will perform sentiment and location analyzation. Finally we will build some dashboards in Kibana to visualize the data. As these are quite many topics we are not going to deep into each but get a working sample at the end. The result will look like the following.
As always the whole project can be found on GitHub.
Step 1: Spark & Twitter
Reading Tweets using Apache Spark is pretty simple when using TwitterUtils.class from Twitter4J. Everything you need is a Twitter Account and Twitter OAuth Access Token which can be requested here.
There are different ways to configure Twitter4J but we place the token in a file called twitter4j.properties and make it available in the classpath.
Now it is already possible to read tweets by using the following code snippet.
Configuration twitterConf = ConfigurationContext.getInstance();
Authorization twitterAuth = AuthorizationFactory.getInstance(twitterConf);
SparkConf sparkConf = new SparkConf().setAppName("Election 2016").setMaster("local[2]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(60000));
String[] filters = {"#Election2016"};
JavaDStream<Status> tweets = TwitterUtils.createStream(streamingContext, twitterAuth, filters);
tweets.print();
streamingContext.start();
streamingContext.awaitTermination();
To feed the tweets later into Elasticsearch we extend the reading by mapping all entities which we need for our analyzation to a map structure. Furthermore we filter only for hashtags which are containing our keywords hillary, clinton, donald and trump....
JavaDStream<Map<String, Object>> statuses = tweets
// create tweets objects from incoming tweets
.map((Function<Status, Map<String, Object>>) status -> {
Map<String, Object> tweet = new HashMap<>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
tweet.put("user", status.getUser().getName());
tweet.put("text", status.getText());
tweet.put("location", status.getUser().getLocation());
tweet.put("hashtags", Arrays.stream(status.getHashtagEntities())
.map(hashtagEntity -> hashtagEntity.getText()).collect(Collectors.joining(", ")));
tweet.put("created_at", sdf.format(status.getCreatedAt()));
return tweet;
})
// filter out tweets which are not related to Hillary Clinton or Donald Trump
.filter((Function<Map<String, Object>, Boolean>) tweet -> {
String hashtags = tweet.get("hashtags").toString().toLowerCase();
return hashtags.contains("hillary") || hashtags.contains("clinton") ||
hashtags.contains("donald") || hashtags.contains("trump");
})
...
Step 2: Sentiment Analyzes
In other Twitter sentiment analyzes I found basically three different approaches.
- Simply use a list of positive and negative words to count the number of positive/negative words in the tweet. If the number of positive words is higher than the number of negative words it should be a positive tweet, if the number of negative words is higher than the number of positive tweets it should be a negative tweet otherwise it should be a neutral tweet. The lists which we are going to use was created by Bing Liu and can be downloaded from his page.
- Use a list of weighted positive and negative words. Same principle as above but include the word weight into the result. The list which we are going to use was created Finn Årup Nielsen and can be downloaded from his page.
- Make use of a natural language processing (nlp) algorithm which trys to understand sentence meanings. The library which we are going to use was created by the Stanford NLP group and can be downloaded from their github page.
To add the sentiment to each tweet we add another mapping function.
.map((Function<Map<String, Object>, Map<String, Object>>) tweet -> {
String text = tweet.get("text").toString();
tweet.put("sentiment_liub", detectSentimentLiub(text));
tweet.put("sentiment_afinn", detectSentimentAfinn(text));
tweet.put("sentiment_corenlp", detectSentimentCoreNlp(text));
return tweet;
})
Step 3: Location Analyzes
Just a few users are adding location information to their tweets. As result you get a null value when accessing the GeoLocation field. But most users provided a valid location within their user profile. So we are going to use the location in the user profile and use the Google GEO API to get the corresponding coordinates.
Using the Google GEO API requires like Twitter API a key which can be requested here. The key must then be added to each request.
To call the Google GEO API we add another mapping function and create a Elasticsearch GeoPoint out of the results which we add to the map structure.
.map((Function<Map<String, Object>, Map<String, Object>>) tweet -> {
tweet.put("geolocation", null);
Object location = tweet.get("location");
if (null != location) {
GeoApiContext context = new GeoApiContext().setApiKey(googleApiProperties.getProperty("key"));
GeocodingResult[] results = GeocodingApi.geocode(context, location.toString()).await();
if (0 != results.length) {
LatLng latLngs = results[0].geometry.location;
tweet.put("geolocation", new GeoPoint(latLngs.lat, latLngs.lng));
}
}
return tweet;
})
Step 4: Running the sample
If you want to run the whole sample - which I highly recommend - please feel free to clone my sample from GitHub and follow the provided readme.

Keine Kommentare:
Kommentar veröffentlichen