Confluent & Twitter4j Tutorial

Reading a Real-Time stream of Tweets into Kafka

Kafka is an amazing tool for processing large amounts of messages at once-- millions of such per second, per broker! It can make ingesting real-time data an efficient and easy task, with just a few lines of Java to handle it.

In this tutorial, we'll be ingesting Tweets using Twitter4j and then publishing those to a Kafka broker.

Setting up your Environment

The Code

You can see the code in its entirety in Github, and clone it to your favorite mac-book or Linux distribution. I suppose you could also run it on a PC, but with Raspberry Pi 4 so close to release, I don't see why you would.

Please note: the first time you run the program (found in the com.ippon.main package), it'll create two files: userauthinfo and badwords.txt.

In the first file, you'll need to provide Twitter API keys / secrets in the following order, one per line:

  1. consumerKey
  2. consumerSecret
  3. accessToken
  4. accessSecret
  5. userId (the number, not your login)

The latter file, badwords.txt, will filter out any tweets that match the content of this file, with each line being a case-insensitive Regex pattern. This is because Twitter contains a lot of things that would make Captain America blush.
Captain America rightfully calling out twitter.

Setting up Kafka

Instructions on how to install Confluent are highly dependent on what you're installing it on, and therefore I'm just going to drop a link to confluent's download page here. However...

How I set it up on my Mac:

  1. Download the Community version from the link above.
  2. Extract the tar file into a directory that you prefer (I put it in ~/confluent so it would be easier to access).
  3. Add the following aliases to your bash profile:
alias zoo="zookeeper-server-start /confluent/etc/kafka/zookeeper.properties"
alias kaf="kafka-server-start /confluent/etc/kafka/server.properties"
alias kafcon="/confluent/bin/kafka-console-consumer"

That makes running the kafka brokers on your macbook a lot easier in the future.

  1. Open a terminal and run: zoo if you used the above aliases, or run the command above to start Zookeeper. Zookeeper is the engine that handles which Brokers are "leaders", and which are followers. It must be running before you run any Kafka Brokers.
  2. type "kaf" to start a Kafka Broker. It will begin accepting new topics and messages immediately.
  3. If you want to check out what's in your broker, you can type: kafcon --from-beginning --topic YourTopicHere --bootstrap-server localhost:9092 to see those messages in console.

An Overview of the Code

com.ippon.main.TwitterMiner: Contains the main method that we want to run in order to start consuming tweets. It'll ask you for a topic name to store it as, and what keywords to search twitter for. Then, it loads your authentication details from the userAuthInfo file and begins to poll twitter for all tweets on that subject, as they happen in real-time.

public static void main(String[] args) {  
    Scanner s = new Scanner(System.in);  
    System.out.print("What is the name of the topic? ");  
    String topic = s.nextLine();  
    System.out.print("What are the keywords to search for? ");  
    String keywords = s.nextLine();
    s.close();  
    TwitterStream t = TwitterStreamFactory.getSingleton();  
    List<String> userAuthInfo = ResourceLoader.loadWords("userAuthInfo");  
    t.setOAuthConsumer(userAuthInfo.get(0), userAuthInfo.get(1));  
    t.setOAuthAccessToken(new AccessToken(userAuthInfo.get(2),  
  userAuthInfo.get(3), Long.parseLong(userAuthInfo.get(4))));  
    t.addListener(new TwitterListener(topic));  
    FilterQuery fq = new FilterQuery();  
    //fq.language("en");  
  fq.filterLevel("none");  
    // Filter on key words  
  System.out.println("Listening for: " + keywords);  
  
    fq.track(keywords.split(" "));  
    t.filter(fq);  
}

com.ippon.twitter.TwitterListener: This class handles what we do with tweets when we recieve one, though there's only one method we really care about:onStatus(Status stat). When we receive a tweet, it will:

  • Convert it to a TweetPojo (our easy-to-JSON Tweet Class)
  • Check that the tweet isn't obscene (either marked as "may contain sensitive material" by Twitter itself, or contains one of our blocked words)
  • If neither of those conditions are met, call our KafkaConnector.dropTweet(TweetPojo) method to pass that tweet to Kafka
public void onStatus(Status stat) {  
  TweetPojo tweet = new TweetPojo(stat);  
    if (tweet.mayBeNSFW() || containsBadWords(tweet)) {  
  System.out.print("Tweet Contained bad words: ");  
        try {  
  String json = om.writeValueAsString(tweet);  
            System.out.println(json);  
        } catch (Exception e) {  
  e.printStackTrace();  
        }  
 } else  
  kc.publishTweet(tweet);  
}

com.ippon.kafka.KafkaConnector: Perhaps the most interesting part of the tutorial, this also the smallest class. It has a Properties object which defines our connection details to connect to our node. The main method is strictly used for testing the connector without connecting to twitter first. There's a constructor, and then our magic method:

public void publishTweet(TweetPojo tweet) {  
  try {  
	String json = new ObjectMapper().writeValueAsString(tweet);  
        System.out.println("Publishing: " + json);  
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);  
        ProducerRecord<String, String> record = new ProducerRecord<>(  
  topic, tweet.getId()+"", json);  
        producer.send(record);  
        producer.close();  
    } catch (JsonProcessingException jpe) {  
	  jpe.printStackTrace();  
    }  
}

This simply takes our tweet, converts it to JSON, creates a new producer record containing the ID of that tweet (as a String instead of the usual Long) and the JSON for that tweet, then sends it to Kafka and closes the connection. That's just 5 lines of code!

Output

When running, the TwitterMiner we've created will take tweets on that subject and put them into your kafka instance, which can be tested with Kafka Consumer Tool using the kafcon alias we created earlier, as such:

kafcon --bootstrap-server localhost:9092 --from-beginning --topic YourTopicHere

The end result should display the tweets you've published! Below are 4 AWS related tweets that mine captured, for example:

{"createdAt":1559583889000,"id":1135603256745451522,"user":{"id":3989283274,"screenName":"Chaiberia","email":null,"name":"Chaiberia","description":" we offer a free job search portal for available positions around the world we do not require a profile or registration  ","lang":null,"location":"Las Vegas, NV"},"inReplyToUserId":-1,"mentionedUsers":[],"language":"en","lang":"en","text":"  jobnearme aws big data architect engineer remote aws partner ai bigdata chicago developer il robotics bigdata view apply  "}
{"createdAt":1559583895000,"id":1135603281298935809,"user":{"id":786784514596937728,"screenName":"RealGophersShip","email":null,"name":"Built with Go","description":" stay on top of latest releases of go libraries and projects push a semantic version tag to be featured  ","lang":null,"location":"Goland"},"inReplyToUserId":-1,"mentionedUsers":[],"language":"en","lang":"en","text":" ivx yet another cloudwatch exporter 0 13 4 aws cloudwatch to prometheus exporter discovers services through aws tags gets cloudwatch data and provides them as  "}
{"createdAt":1559583914000,"id":1135603363310186496,"user":{"id":3160056626,"screenName":"appseccouk","email":null,"name":"Appsecco","description":" the application security company ","lang":null,"location":"London"},"inReplyToUserId":-1,"mentionedUsers":[14862447],"language":"en","lang":"en","text":" monitoring for attacks defending against them in real time is crucial join our updated training blackhatevents on automated defence using cloud services for aws azure and gcp with elk serverless register now bhusa security devsecops  "}
{"createdAt":1559583917000,"id":1135603373150003201,"user":{"id":907424821440643074,"screenName":"AfonzaCorp","email":null,"name":"afonza","description":" amazon web services managed hosting and consulting ","lang":null,"location":"Atlanta, GA"},"inReplyToUserId":-1,"mentionedUsers":[],"language":"en","lang":"en","text":"  aws news learn about aws services solutions june aws online tech talks  "}

Signing Off

I hope this quick-and-dirty tutorial helps you get your hands onto kafka faster, and more importantly, gives you access to real data that you can begin performing transformations and analysis on. Maybe you want to see what sports are most popular in different regions of the world by searching for them as keywords and cross referencing their geo-coordinates. Maybe you want to measure how controversial a topic is by running sentiment analysis on it and seeing how split the opinion is. In any case, hopefully my code can help you get started on the right foot!

Author image
Richmond Virginia
OUR COMPANY
Ippon Technologies is an international consulting firm that specializes in Agile Development, Big Data and DevOps / Cloud. Our 300+ highly skilled consultants are located in the US, France and Australia. Ippon technologies has a $32 million revenue and a 20% annual growth rate.