Watermarks in Apache Spark Structured Streaming

Apache Spark’s Structured Streaming API is a powerful tool for processing real-time data streams. In this context, there are certain use cases where ensuring the accuracy of the processed data is not trivial due to the time dimension that inherently affects a streaming data flow, so it is necessary to have tools that allow us to control and manage time efficiently. 

In this post, we will explore the tools available in the API and the related concepts offered by this API.

Introduction to time management with Structured Streaming

In real-time data processing it is important to differentiate between event time (when the event actually occurred, typically a time stamp on the data itself) and processing time (when the event is processed by the system). In the real world, events may be processed in a delayed or unordered manner due to data center locations or differences in the connections over which the events are published, to give a few examples.

Operating solely on the basis of processing time, these contingencies are likely to distort metrics and aggregations if they are to be obtained at the event time level. A further improvement of the Structured Streaming API over the DStream API is that it allows just that: by pointing to a data field as event time and defining a watermark, calculations such as aggregations by time windows can be accurately performed.

Watermarks

Watermarks are a mechanism that allows Spark to clean up old data stored in an internal state that is no longer considered relevant for aggregations based on temporary windows. This is useful for handling data that has come in late efficiently without accumulating an unnecessarily large state. Looked at from another point of view, due to the delay or clutter with which data enters the system, if data is to be processed based on the time of the event, Spark needs some mechanism to decide when it can terminate an aggregation.

For example, let’s imagine a case where events with a timestamp and a key are coming in. It has been decided (1) to discard all the data entering more than 10 minutes late (or, in other words, to define a 10-minute watermark) and (2) the data has to be grouped according to the key and in fixed 10-minute windows in order to count the number of events in each time window.

By definition, watermarks provide one-way aggregation guarantees. This means that, given a 10-minute watermark, any event with a delay of less than 10 minutes is guaranteed to be processed. However, events arriving more than 10 minutes late are not guaranteed to stop being processed.

Aggregations with watermarks and temporary windows

Temporary windows allow grouping data in time intervals to perform aggregations such as counts, sums, averages, etc.

Source: Structured Streaming in Apache Spark Guide.

There are three types of windows:

  • Tumbling Windows: Fixed, non-overlapping time windows, as in the previous example. These windows are always point fractions of time (00:00-00:10, 00:10-00:20, 00:20-00:30, etc.).
  • Sliding Windows: Time windows like sliding windows, but overlapping.
  • Session Windows: Windows that are based on event activity and can be dynamically adjusted.

With the same streaming readout as in the previous example, the following example creates 10-minute windows that overlap every 5 minutes, which allows capturing events that could be spread across multiple fixed windows.

Source: Structured Streaming in Apache Spark Guide.

Again with the same example for reading, this can be understood as a stream of events representing interactions of a user. Events are grouped into sessions based on their activity with a maximum duration of 10 minutes with a maximum inactivity timeout of 5 minutes. This means that, once a session is started, it will last 10 minutes unless, within those 10 minutes from the start, a period of inactivity of more than 5 minutes occurs.

Source: Structured Streaming in Apache Spark Guide.

Data joins and watermarks

Joining events by means of join operations can be done with limitations. There is no Stream-Static join that allows state handling, so watermarks do not make any sense in this context. However, they do in Stream-Stream joins.

In Stream-Stream junctions, in addition to being able to define a watermark for each data stream, it is interesting to establish a third temporal factor related to the time difference between two matching records. It is in this sense that we speak of temporal restriction or maximum delay.

In these cases, the procedure is as follows:

  • Both streaming reads are defined with their own watermarks. In some cases it may be optional.
  • For the set of events to be matched, it is decided what maximum delay is allowed between the arrival on one side and the other. This is always recommended, if not mandatory.
  • The records are merged, with the following limitations:
    • For Inner mode joints, watermarks are optional and it is recommended to define a maximum delay to clear the state.
    • For all other joints, the watermark is mandatory on the side opposite to the mode definition (left for Right Outer, right for Left Semi, etc.) and optional, but recommended for the other, to completely clear the state. The maximum delay is always mandatory. For Full Outer mode the requirements are identical, with either side being mandatory.

In this example two data streams are joined with a watermark of 10 minutes each, with a maximum delay of 5 minutes between pairings.

Write modes and watermarks

Of the three possible write modes in Spark, watermarks can only be used with two of them: append and update. For the third mode, complete, it does not make sense because it requires saving all aggregations without the possibility of deleting intermediate states.

Using the append mode, each aggregate is written only once. This implies that you are only going to write the aggregations once you have started receiving events with an event timestamp after the window closure plus the watermark time. After writing, the state of that window is cleared. Events pertaining to it that came in later would potentially be ignored.

With update mode, aggregations occur with each data ingest. Although the watermark is not necessary to produce the output, its importance lies in the fact that it clears the states of closed windows, with the same criteria as with append mode.

See the following example to observe the writing in each mode for fixed 10-minute windows where the aggregation is a simple count of events.

Processed timeEvent timeValueWriting (append)Writing (update)
01:0201:01101:00-01:10 -> 1
01:0401:02101:00-01:10 -> 2
01:0901:07101:00-01:10 -> 3
01:1101:09101:00-01:10 -> 4
01:1301:11101:00-01:10 -> 4
01:10-01:20 -> 1
01:1801:09101:00-01:10 -> 5
01:10-01:20 -> 1
01:2101:17101:00-01:10 -> 5
01:10-01:20 -> 2
01:2201:21101:00-01:10 -> 501:00-01:10 -> 5
01:10-01:20 -> 2
01:20-01:30 -> 1
01:2501:081(ignored event)(ignored event)
01:3201:31101:10-01:20 -> 201:10-01:20 -> 2
01:20-01:30 -> 1
01:30-01:40 -> 1
Conclusion

Event timing management in Spark’s Structured Streaming API is essential for maintaining the accuracy and relevance of near real-time events. Using watermarks it is possible to handle delayed events to perform the required aggregations, transformations and computations on data streams, while optimizing in-memory states. 

In addition, in this post we have explored some complementary concepts and particularities of the use of watermarks, such as temporal windows, maximum delays in the union of data streams and supported write modes.

If you found this article interesting, we encourage you to visit the Data Engineering category of our blog to see posts similar to this one and to share it in networks with all your contacts. Don’t forget to mention us to let us know your opinion. See you soon!
Jordi Vanrell
Jordi Vanrell
Articles: 10