Streaming Data With Apache Flink

The objective of this article is to understand the know-how of stream processing and technologies that can be used.

One could use streaming data to do event processing, store and create meaningful dashboards to derive real-time insights. For the article's sake, lets focus on a topic that is being widely discussed all over the world so that we could relate to the importance of real-time analytics and stream processing. COVID-19 is the hot bubble. Let us go ahead and create a pipeline to stream tweets and understand what people talk about it via a sentiment analysis.

I will use apache flink, apache kafka, elasticsearch and kibana for this tutorial. The use of kafka here can be seen as optional but however it could be important if there are other services that require the processed tweets from flink. E.g. If you also want the tweets to be written to a database of your choice apart from writing to elasticsearch, kafka enables you to decouple this service. Elasticsearch is used as the data storage medium and kibana from the same ELK stack to visualize the data.

Stream Data Pipeline

Let me give you a very high-level understanding of these technologies before we get started. apache flink is a framework for stream-processing, it can be used for almost any streaming use case starting from event driven applications, stream and batch analytics to data pipelines and ETL's. Apache flink documentation explains it's purpose and use cases in detail. Apache kafka on the other hand is a publisher subscriber messaging system used mainly to build real time data pipelines and streaming applications. It has many advantages over traditional pub sub messaging systems which you can find out by reading the documentation. Elasticsearch and Kibana represents the E and K of the ELK stack. In a nutshell elasticsearch is a search and analytics engine whereas Kibana lets users visualize data with charts and graphs in Elasticsearch. Also note that elasticsearch underlays a document type nosql database as the storage medium. Find more details of the elk stack here.

Getting Started

Since we will be using Twitter as the streaming source let us start by creating a twitter developer application. Accessing twitter API's requires a set of credentials namely, API key, API secret, Access token and Access token secret. Thus creating a twitter developer application allows us to obtain these. You will have to go through an approval process which usually takes at-most 24 hours. Provided you have a registered twitter account, Go to https://developer.twitter.com/en/apps and create an application by clicking create an app button. You will have to answer a few questions to satisfy the twitter formalities. Once submitted for verification, the twitter team will get back to you via email with confirmation. Once you're application is verified, you will see your application listed when you go the above link. Click details to view your app details and under key and tokens section, you will find the credentials required to access the twitter streaming API. You will have to generate your access token and secret by clicking the generate button. You will have to note these keys down on a protected password sheet cause you'll require them down the line.

The next step would be to configure apache flink, preferably on a Linux environment. This is quite an easy process. Follow the steps mentioned in the apache flink documentation. By default the front-end runs on port 8081. You can always change it in the flink-conf.yaml file to your desired port. After verifying your service is up and running you can now start with the code. On this tutorial we will create maven project to create our flink and kafka agents. Lets focus on setting up and configuring the remaining services as we proceed. As the first step, let us include the maven dependencies. Add the below to your pom.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-twitter_2.12</artifactId>
    <version>1.10.0</version>
</dependency>


We require this dependency to connect to twitter streaming API via flink. Next we need the twitter credentials we obtained from creating a developer application. I suggest you include a twitter.properties file to the class path or anywhere of your choice. To ease things out lets include your properties in this fashion. Replace the x's with the actual values.

twitter-source.consumerKey=xxx
twitter-source.consumerSecret=xxx
twitter-source.token=xxx
twitter-source.tokenSecret=xxx

Include the code snippet inside your main or test function.

{
    Properties twitterProperties = new Properties();
    twitterProperties.load(new FileInputStream("twitter.properties"));
    TwitterSource twitterSource = new TwitterSource(twitterProperties);
    twitterSource.setCustomEndpointInitializer(new TweetFilter());
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> streamSource = environment.addSource(twitterSource).flatMap(new TweetFlatMapper());
    streamSource.print(); //Print the stream.
    environment.execute("Twitter-Streaming-Job");
}

Implementing EndpointInitializer interface from the flink streaming connector of twitter. Since we only want the tweets related to COVID-19 we shall include the terms related by initializing a status filter endpoint.

private static class TweetFilter implements EndpointInitializer, Serializable {

    @Override
    public StreamingEndpoint createEndpoint() {
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Arrays.asList("COVID-19, Corona"));
        return endpoint;
    }
}

Implementing the FlatMapFunction, this is where you perform any complex operation or transformations. In our context this is where we read through the big JSON array given to use from the twitter streaming API and fetch only the required fields. Let us write some code to filter for only English tweets and return only the tweet, language, tweet created time, the origin of the tweet, and the sentiment value of the tweet. The flat map function is overridden to output a JSON string with only what is required.

private static class TweetFlatMapper implements FlatMapFunction<String, String> {

    @Override
    public void flatMap(String tweet, Collector<String> out) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            JsonNode jsonNode = mapper.readValue(tweet, JsonNode.class);

            boolean isEnglish = jsonNode.has("lang") && jsonNode.get("lang").asText().equals("en");
            boolean containsExtendedTweet = jsonNode.has("extended_tweet");

            if(isEnglish) {
                JSONObject jsonObject = new JSONObject();
                String tweetValue = containsExtendedTweet ? jsonNode.get("extended_tweet").get("full_text").textValue() : jsonNode.get("text").textValue();
                jsonObject.put("tweet", tweetValue);
                jsonObject.put("language", jsonNode.get("lang").textValue());
                jsonObject.put("created", jsonNode.get("created_at").textValue());
                jsonObject.put("sentiment", SentimentAnalyzer.predictSentiment(tweetValue));
                {
                    String location = "N/A";
                    if (!jsonNode.get("place").isEmpty()) {
                        location = jsonNode.get("place").get("country").textValue();
                    }
                    jsonObject.put("location", location);
                }
                out.collect(jsonObject.toJSONString());
            }
        } catch (Exception ex) {
            LOG.error(ex.getCause());
        }
}

Now if your run the above you will see the tweets streamed and printed on your IDE console as a JSON string.

The use of Kafka, Elasticsearch and Kibana will be discussed in the next tutorial.

To view or add a comment, sign in

More articles by Ashan Wijenayake

  • Create Embarrassingly Parrallel Pipelines using Apache Beam

    I'll start with a few concepts & terminologies of Apache Beam. We will be discussing on Apache Beam Pipelines, Pipeline…

  • Manage your maven dependencies properly in a Talend project

    Hello everyone, I will not be writing about the know how's of injecting a maven dependency into a Talend project. I bet…

  • Polar H7 Raspberry Pie Integration

    It's very obvious that these wearable devices come with a default mobile application hence making it very easy to pair…

  • What is Descriptive Analytics

    Descriptive analytics is something that statisticians have been practicing for ages. You may have heard how quickly…

    7 Comments
  • Data Analytics in Short

    This is short and a very high-level explanation on data analytics. The word analytics does not only mean prediction.

Others also viewed

Explore content categories