Real-time streaming data ingest is a common requirement for many big data use cases. In fields like IoT, e-commerce, security, communications, entertainment, finance, and retail, where so much depends on timely and accurate data-driven decision making, real-time data collection and analysis are in fact core to the business.

However, collecting, storing and processing streaming data in large volumes and at high velocity presents architectural challenges. An important first step in delivering real-time data analysis is ensuring that adequate network, compute, storage, and memory resources are available to capture fast data streams. But a company’s software stack must match the performance of its physical infrastructure. Otherwise, businesses will face a massive backlog of data, or worse, missing or incomplete data.

Redis has become a popular choice for such fast data ingest scenarios. A lightweight in-memory database platform, Redis achieves throughput in the millions of operations per second with sub-millisecond latencies, while drawing on minimal resources. It also offers simple implementations, enabled by its multiple data structures and functions.

In this article, I will show how Redis Enterprise can solve common challenges associated with the ingestion and processing of large volumes of high velocity data. We’ll walk through three different approaches (including code) to processing a Twitter feed in real time, using Redis Pub/Sub, Redis Lists, and Redis Sorted Sets, respectively. As we’ll see, all three methods have a role to play in fast data ingestion, depending on the use case.

more than a million read/write operations per second, with sub-millisecond latency on a modestly sized commodity cloud instance, making it extremely resource-efficient for large volumes of data. Redis also supports messaging services and client libraries in all of the popular programming languages, making it well-suited for combining high-speed data ingest and real-time analytics. Redis Pub/Sub commands allow it to play the role of a message broker between publishers and subscribers, a feature often used to send notifications or messages between distributed data ingest nodes.

Redis Enterprise enhances Redis with seamless scaling, always-on availability, automated deployment, and the ability to use cost-effective flash memory as a RAM extender so that the processing of large datasets can be accomplished cost-effectively.

.

The Subscriber class is the core class of this design. Every Subscriber object maintains a new connection with Redis.

class Subscriber extends JedisPubSub implements Runnable{
       private String name ="Subscriber";
       private RedisConnection conn = null;
       private Jedis jedis = null;

       private String subscriberChannel ="defaultchannel";

       public Subscriber(String subscriberName, String channelName) throws Exception{
              name = subscriberName;
              subscriberChannel = channelName;
              Thread t = new Thread(this);
              t.start();
       }

       @Override
       public void run(){
              try{
                      conn = RedisConnection.getRedisConnection();
                      jedis = conn.getJedis();
                      while(true){
                             jedis.subscribe(this, this.subscriberChannel);
                      }
              }catch(Exception e){
                      e.printStackTrace();
              }
       }

       @Override
       public void onMessage(String channel, String message){
              super.onMessage(channel, message);
       }
}

The Publisher class maintains a separate connection to Redis for publishing messages to a channel.

public class Publisher{

       RedisConnection conn = null;
       Jedis jedis = null;

       private String channel ="defaultchannel";

       public Publisher(String channelName) throws Exception{
              channel = channelName;
              conn = RedisConnection.getRedisConnection();
              jedis = conn.getJedis();
       }

       public void publish(String msg) throws Exception{
              jedis.publish(channel, msg);
       }
}

The EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, and InfluencerCollector filters extend Subscriber, which enables them to listen to the inbound channels. Since you need separate Redis connections for subscribe and publish, each filter class has its own RedisConnection object. Filters listen to the new messages in their channels in a loop. Here is the sample code of the EnglishTweetFilter class:

public class EnglishTweetFilter extends Subscriber
{

       private RedisConnection conn = null;
       private Jedis jedis = null; 
       private String publisherChannel = null;

public EnglishTweetFilter(String name, String subscriberChannel, String publisherChannel) throws Exception{
              super(name, subscriberChannel);
              this.publisherChannel = publisherChannel;
              conn = RedisConnection.getRedisConnection();
              jedis = conn.getJedis();           
       }

       @Override
       public void onMessage(String subscriberChannel, String message){
              JsonParser jsonParser = new JsonParser();
              JsonElement jsonElement = jsonParser.parse(message);
              JsonObject jsonObject = jsonElement.getAsJsonObject();

              //filter messages: publish only English tweets           
if(jsonObject.get(“lang”) != null &&
       jsonObject.get(“lang”).getAsString().equals(“en”)){
                      jedis.publish(publisherChannel, message);
              }
       }
}

The Publisher class has a publish method that publishes messages to the required channel.

public class Publisher{
.
.     
       public void publish(String msg) throws Exception{
              jedis.publish(channel, msg);
       }
.
}

The main class reads data from the ingest stream and posts it to the AllData channel. The main method of this class starts all of the filter objects.

public class IngestPubSub
{
.
       public void start() throws Exception{
       .
       .
              publisher = new Publisher(“AllData”);

              englishFilter = new EnglishTweetFilter(“English Filter”,”AllData”,
                                           “EnglishTweets”);
              influencerFilter = new InfluencerTweetFilter(“Influencer Filter”,
                                           “AllData”, “InfluencerTweets”);
              hashtagCollector = new HashTagCollector(“Hashtag Collector”, 
                                           “EnglishTweets”);
              influencerCollector = new InfluencerCollector(“Influencer Collector”,
                                           “InfluencerTweets”);
       .
       .
}

Ingest with Redis Lists

The List data structure in Redis makes implementing a queueing solution easy and straightforward. In this solution, the producer pushes every message to the back of the queue, and the subscriber polls the queue and pulls new messages from the other end.

Redis Labs

Figure 4. Fast data ingest with Redis Lists

Pros

  • This method is reliable in cases of connection loss. Once data is pushed into the lists, it is preserved there until the subscribers read it. This is true even if the subscribers are stopped or lose their connection with the Redis server.
  • Producers and consumers require no connection between them.

Cons

  • Once data is pulled from the list, it is removed and cannot be retrieved again. Unless the consumers persist the data, it is lost as soon as it is consumed.
  • Every consumer requires a separate queue, which requires storing multiple copies of the data.

Code design for the Redis Lists solution

redis lists class diagramRedis Labs

Figure 5. Class diagram of the fast data ingest solution with Redis Lists

You can download the source code for the Redis Lists solution here: . This solution’s main classes are explained below.

MessageList embeds the Redis List data structure. The push() method pushes the new message to the left of the queue, and pop() waits for a new message from the right if the queue is empty.

public class MessageList{

       protected String name = “MyList”; // Name
.
.     
       public void push(String msg) throws Exception{
              jedis.lpush(name, msg); // Left Push
       }

       public String pop() throws Exception{
              return jedis.brpop(0, name).toString();
       }
.
.
}

MessageListener is an abstract class that implements listener and publisher logic. A MessageListener object listens to only one list, but can publish to multiple channels (MessageFilter objects). This solution requires a separate MessageFilter object for each subscriber down the pipe.

class MessageListener implements Runnable{
       private String name = null;
       private MessageList inboundList = null;
       Map outBoundMsgFilters = new HashMap();
.
.     
       public void registerOutBoundMessageList(MessageFilter msgFilter){
              if(msgFilter != null){
                      if(outBoundMsgFilters.get(msgFilter.name) == null){
                             outBoundMsgFilters.put(msgFilter.name, msgFilter);
                      }
              }
       }

.
.
       @Override
       public void run(){
.
                      while(true){
                             String msg = inboundList.pop();
                             processMessage(msg);
                      }                                  
.
       }

.
       protected void pushMessage(String msg) throws Exception{
              Set outBoundMsgNames = outBoundMsgFilters.keySet();
              for(String name : outBoundMsgNames ){
                      MessageFilter msgList = outBoundMsgFilters.get(name);
                      msgList.filterAndPush(msg);
              }
       }
}

MessageFilter is a parent class facilitating the filterAndPush() method. As data flows through the ingest system, it is often filtered or transformed before being sent to the next stage. Classes that extend the MessageFilter class override the filterAndPush() method, and implement their own logic to push the filtered message to the next list.

public class MessageFilter{

       MessageList messageList = null;
.
.
       public void filterAndPush(String msg) throws Exception{
              messageList.push(msg);
       }
.
.     
}

AllTweetsListener is a sample implementation of a MessageListener class. This listens to all tweets on the AllData channel, and publishes the data to EnglishTweetsFilter and InfluencerFilter.

public class AllTweetsListener extends MessageListener{
.
.     
       public static void main(String[] args) throws Exception{
              MessageListener allTweetsProcessor = AllTweetsListener.getInstance();

allTweetsProcessor.registerOutBoundMessageList(new
              EnglishTweetsFilter(“EnglishTweetsFilter”, “EnglishTweets”));
              allTweetsProcessor.registerOutBoundMessageList(new
                             InfluencerFilter(“InfluencerFilter”, “Influencers”));

              allTweetsProcessor.start();
       }
.
.
}

EnglishTweetsFilter extends MessageFilter. This class implements logic to select only those tweets that are marked as English tweets. The filter discards non-English tweets and pushes English tweets to the next list.

public class EnglishTweetsFilter extends MessageFilter{

       public EnglishTweetsFilter(String name, String listName) throws Exception{
              super(name, listName);
       }

       @Override
       public void filterAndPush(String message) throws Exception{
              JsonParser jsonParser = new JsonParser();

              JsonElement jsonElement = jsonParser.parse(message);
              JsonArray jsonArray = jsonElement.getAsJsonArray();
              JsonObject jsonObject = jsonArray.get(1).getAsJsonObject();
              if(jsonObject.get(“lang”) != null &&
jsonObject.get(“lang”).getAsString().equals(“en”)){
                             Jedis jedis = super.getJedisInstance();
                             if(jedis != null){
                                    jedis.lpush(super.name, jsonObject.toString());
    
                             }
              }
       }
}