Apache Flink is a framework for implementing stateful stream processing applications and running them at scale on a compute cluster. In a we examined what stateful stream processing is, what use cases it addresses, and why you should implement and run your streaming applications with Apache Flink.

In this article, I will present examples for two common use cases of stateful stream processing and discuss how they can be implemented with Flink. The first use case is event-driven applications, i.e., applications that ingest continuous streams of events and apply some business logic to these events. The second is the streaming analytics use case, where I will present two analytical queries implemented with Flink’s SQL API, which aggregate streaming data in real-time. We at Data Artisans provide the source code of all of our examples in a .

Before we dive into the details of the examples, I will introduce the event stream that is ingested by the example applications and explain how you can run the code that we provide.

A stream of taxi ride events

Our example applications are based on a about taxi rides that happened in New York City in 2013. The organizers of the rearranged the original data set and converted it into a single CSV file from which we are reading the following nine fields.

.

All example applications sequentially read the CSV file and ingest it as a stream of taxi ride events. From there on, the applications process the events just like any other stream, i.e., like a stream that is ingested from a log-based publish-subscribe system, such as Apache Kafka or Kinesis. In fact, reading a file (or any other type of persisted data) and treating it as a stream is a cornerstone of Flink’s approach to unifying batch and stream processing.

Running the Flink examples

As mentioned earlier, we published the source code of our example applications in a . We encourage you to fork and clone the repository. The examples can be easily executed from within your IDE of choice; you don’t need to set up and configure a Flink cluster to run them. First, import the source code of the examples as a Maven project. Then, execute the main class of an application and provide the storage location of the data file (see above for the link to download the data) as a program parameter.

Once you have launched an application, it will start a local, embedded Flink instance inside the application’s JVM process and submit the application to execute it. You will see a bunch of log statements while Flink is starting and the job’s tasks are being scheduled. Once the application is running, its output will be written to the standard output.

to 12 hour shifts and require a break of at least eight hours before the next shift may be started. A shift starts with the beginning of the first ride. From then on, a driver may start new rides within a window of 12 hours. Our application tracks the rides of drivers, marks the end time of their 12-hour window (i.e., the time when they may start the last ride), and flags rides that violated the regulation. You can find the in our GitHub repository.

Our application is implemented with Flink’s DataStream API and a KeyedProcessFunction. The DataStream API is a functional API and based on the concept of typed data streams. A DataStream is the logical representation of a stream of events of type T. A stream is processed by applying a function to it that produces another data stream, possibly of a different type. Flink processes streams in parallel by distributing events to stream partitions and applying different instances of functions to each partition.

The following code snippet shows the high-level flow of our monitoring application.

or . The next step is to key the TaxiRide events by the licenseId of the driver. The keyBy operation partitions the stream on the declared field, such that all events with the same key are processed by the same parallel instance of the following function. In our case, we partition on the licenseId field because we want to monitor the working time of each individual driver.

Next, we apply the MonitorWorkTime function on the partitioned TaxiRide events. The function tracks the rides per driver and monitors their shifts and break times. It emits events of type Tuple2, where each tuple represents a notification consisting of the license ID of the driver and a message. Finally, our application emits the messages by printing them to the standard output. A real-world application would write the notifications to an external message or storage system, like Apache Kafka, HDFS, or a database system, or would trigger an external call to immediately push them out.

Now that we’ve discussed the overall flow of the application, let’s have a look at the MonitorWorkTime function, which contains most of the application’s actual business logic. The MonitorWorkTime function is a stateful KeyedProcessFunction that ingests TaxiRide events and emits Tuple2 records. The KeyedProcessFunction interface features two methods to process data: processElement() and onTimer(). The processElement() method is called for each arriving event. The onTimer() method is called when a previously registered timer fires. The following snippet shows the skeleton of the MonitorWorkTime function and everything that is declared outside of the processing methods.

public static class MonitorWorkTime
    extends KeyedProcessFunction> {

  // time constants in milliseconds
  private static final long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 hours
  private static final long REQ_BREAK_TIME = 8 * 60 * 60 * 1000;     // 8 hours
  private static final long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 hours

 private transient DateTimeFormatter formatter;

  // state handle to store the starting time of a shift
  ValueState shiftStart;

  @Override
  public void open(Configuration conf) {
    // register state handle
    shiftStart = getRuntimeContext().getState(
      new ValueStateDescriptor<>(“shiftStart”, Types.LONG));
    // initialize time formatter
    this.formatter = DateTimeFormat.forPattern(“yyyy-MM-dd HH:mm:ss”);
  }

  // processElement() and onTimer() are discussed in detail below.
}

The function declares a few constants for time intervals in milliseconds, a time formatter, and a state handle for keyed state that is managed by Flink. Managed state is periodically checkpointed and automatically restored in case of a failure. Keyed state is organized per key, which means that a function will maintain one value per handle and key. In our case, the MonitorWorkTime function maintains a Long value for each key, i.e., for each licenseId. The shiftStart state stores the starting time of a driver’s shift. The state handle is initialized in the open() method, which is called once before the first event is processed.

Now, let’s have a look at the processElement() method.

@Override
public void processElement(
    TaxiRide ride,
    Context ctx,
    Collector> out) throws Exception {

  // look up start time of the last shift
  Long startTs = shiftStart.value();

  if (startTs == null ||
    startTs < ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // this is the first ride of a new shift.
    startTs = ride.pickUpTime;
    shiftStart.update(startTs);
    long endTs = startTs + ALLOWED_WORK_TIME;
    out.collect(Tuple2.of(ride.licenseId,
      “You are allowed to accept new passengers until “ + formatter.print(endTs)));

    // register timer to clean up the state in 24h
    ctx.timerService().registerEventTimeTimer(startTs + CLEAN_UP_INTERVAL);
  } else if (startTs < ride.pickUpTime - ALLOWED_WORK_TIME) {
    // this ride started after the allowed work time ended.
    // it is a violation of the regulations!
    out.collect(Tuple2.of(ride.licenseId,
      “This ride violated the working time regulations.”));
  }
}

The processElement() method is called for each TaxiRide event. First, the method fetches the start time of the driver’s shift from the state handle. If the state does not contain a start time (startTs == null) or if the last shift started more than 20 hours (ALLOWED_WORK_TIME + REQ_BREAK_TIME) earlier than the current ride, the current ride is the first ride of a new shift. In either case, the function starts a new shift by updating the start time of the shift to the start time of the current ride, emits a message to the driver with the end time of the new shift, and registers a timer to clean up the state in 24 hours.

If the current ride is not the first ride of a new shift, the function checks if it violates the working time regulation, i.e., whether it started more than 12 hours later than the start of the driver’s current shift. If that is the case, the function emits a message to inform the driver about the violation.

The processElement() method of the MonitorWorkTime function registers a timer to clean up the state 24 hours after the start of a shift. Removing state that is no longer needed is important to prevent growing state sizes due to leaking state. A timer fires when the time of the application passes the timer’s timestamp. At that point, the onTimer() method is called. Similar to state, timers are maintained per key, and the function is put into the context of the associated key before the onTimer() method is called. Hence, all state access is directed to the key that was active when the timer was registered.

Let’s have a look at the onTimer() method of MonitorWorkTime.

@Override
public void onTimer(
    long timerTs,
    OnTimerContext ctx,
    Collector> out) throws Exception {

  // remove the shift state if no new shift was started already.
  Long startTs = shiftStart.value();
  if (startTs == timerTs - CLEAN_UP_INTERVAL) {
    shiftStart.clear();
  }
}

The processElement() method registers timers for 24 hours after a shift started to clean up state that is no longer needed. Cleaning up the state is the only logic that the onTimer() method implements. When a timer fires, we check if the driver started a new shift in the meantime, i.e., whether the shift starting time changed. If that is not the case, we clear the shift state for the driver.