Reactive Programming with Java 9's Flow

Reactive Programming with Java 9's Flow

In this guide, you’ll learn with an example of how the Flow API in Java 9 can help you build Reactive patterns with the new Publisher and Subscriber interfaces. After reading it, you should be able to understand this new programming style and its pros and cons. All the code is on GitHub so you can also run it yourself or try some modifications.

Table of Contents

Introduction to Java 9’s Flow API

Java 9 has introduced some new interesting features in this old, yet very popular, programming language. This guide focuses on the new Flow API, which enables us to adopt Reactive Programming using just the JDK, not needing additional libraries such as RxJava or Project Reactor, amongst others.

However, you’ll realize pretty soon after having a look at the API that it’s basically what it promises: an API. It’s composed of a few interfaces and only one implementation:

  • The interface Flow.Publisher<T> defines methods to produce items and control signals.
  • The interface Flow.Subscriber<T> defines methods to receive those messages and signals.
  • The interface Flow.Subscription defines the methods to link both the Publisher and the Subscriber.
  • The interface Flow.Processor<T,R> defines methods to do some advanced operations like chaining transformations of items from publishers to subscribers.
  • Finally, the class SubmissionPublisher<T>implements Flow.Publisher<T> and it's a flexible producer of items, compliant with the Reactive Streams initiative.

Even though there are not many classes to play with, including this API in Java 9 is a major change: vendors and third parties can provide Reactive support for their libraries relying on those interfaces, from a JDBC driver to a RabbitMQ reactive implementation, for example.

From Pull to Push to Pull-Push

Reactive programming, if I try to shrink it to a paragraph, is a way of programming in which the consumers are in control of the Data Flow, assigning a special importance to the fact that there might be slow consumers that need the publisher to slow down to be able to read all items within the data stream (the back-pressure concept). It’s not a disruptive technique; you could have used this pattern already, but it’s now becoming popular due to its integration in major frameworks and library distributions (e.g. Java 9 or Spring 5), and the rise of distributed systems coming with huge amounts of data that needs to be inter-communicated.

Looking at the past helps us understand its rise as well. Some years ago, the most popular technique to get data from consumers was a pull-based-mechanism. The client polls for data periodically and, if available, they read it. The advantage is that they can control the flow of data in case of having fewer resources (stop polling); the main disadvantage is the waste of processing time and/or network resources by polling for data when there is nothing to consume.

The trend changed over time and it became popular to push data from producers and let consumers take care of it. The problem with that is that consumers may have more limited resources than producers, ending up with full input buffers in case of slow consumers and therefore data losses. This may be fine if it happens only to a low percentage of our subscribers but, what if it’s happening to most of them? We could do better slowing down our publisher…

The hybrid pull-push approach that comes with Reactive Programming tries to bring the best of both worlds: it lets the consumers the responsibility of requesting data and controlling the flow from the Publisher, which can also decide between blocking or dropping data in case of lack of resources. We’ll see a good practical example below.

Get the book Practical Software Architecture

Differences between a Flow and a Stream

Reactive Programming is not the new hype to replace Functional Programming. Both are compatible and work perfectly together. While the Streams API introduced in Java 8 is perfect to process data streams (map, reduce and all the variants), the Flow API shines on the communication side (request, slow down, drop, block, etc.). You can use Streams as data sources for Publisher, blocking them or dropping items as needed. You can also use them on your Subscriber’s side, for example, to perform aggregations after receiving some items. Not to mention all the rest of programming logic in which reactive streams doesn’t fit but yet it can be written in a functional style and be ten times more readable and easier to maintain than in imperative programming.

But there is a part that tends to puzzle us: if you need to exchange data between two systems applying transformations between them (not before the items are published), how do Streams and Flows work together? In that case, we could use a Java 8 Function to map source to target (transforming it) but we can’t use a Stream in between the Publisher and Subscriber, can we? As geniuses we are, we might come with the idea of creating a subscriber in between, which takes items from the original publisher, transform it and then acts as well as publisher, being the one to which our original subscriber subscribes. Good news: that’s the pattern coming with Java 9’s Flow.Processor<T, R> so we just need to implement that interface and write the functions there to transform data.

Personally, I’m not a fan of going Full-reactiveOver-reactive or being a Reactive Evangelist (I can’t decide on a specific term for this). Try not to get crazy about it and avoid replacing Streams with Flows where it doesn’t make sense to do that. It’s technically possible to write, within a class method, some lines of code that take an array of ints, create a Publisher and, next to it, a Processor (or Transformer) which maps every int to a String to finally create a Subscriber to collect them to a single long string message, as an example. But, why would you do that? You don’t need to control the Flow between two parts of your system or two threads so, why making your code more complicated? In those cases better use a Stream.

The Magazine Publisher

The reactive magazine publisher

The sample code included in this guide models a Magazine Publisher use case. The Publisher only has two Subscribers (but hey, it’s a good start).

The publisher will produce a series of 20 magazines per subscriber. They know that their readers are not usually at home at the delivery time and they want to avoid the postman to return back the magazines or throw them away. That might happen because the publisher knows that the mailboxes of the subscribers are normally pretty small to put more mail (the subscriber’s buffer).

Instead of that, they implement a very innovative delivery system: the subscriber calls them when they’re at home and they deliver one magazine (the next for that subscriber) almost immediately. The publisher plans to keep a small box per subscriber at the office in case some of them don’t call to get the magazine as soon as it’s published. The manager thinks that keeping a space up to 8 magazines per subscriber at the publisher’s office is more than enough (note how the buffer is now at the publisher’s side).

Then, one of the workers go to the Manager’s office to warn them about different situations:

  1. If subscribers are fast enough to request a delivery, as fast as the publisher is printing new magazines, there will be no problems of space.
  2. If subscribers are not calling at the same pace as magazines are printed, the boxes might become full. The worker asks the manager how they should react to that case:
    1. Increment the size of the boxes to 20 per subscriber, which would fix the problem (i.e. more resources on the publisher side).
    2. Stop the printing until the situation gets fixed (the subscriber requests at least one) and then slow down, in detriment of some subscribers that might be fast enough to keep their boxes empty.
    3. Throw to the recycle bin (drop) any magazine that doesn't fit in the box of subscribers immediately after its production.
    4. An in-between solution: if any of the boxes are full, wait before printing the next number for a maximum amount of time. If after that time there is no space yet, then they'll recycle (drop) the new number.

The manager explains that they can’t afford the first solution, spending such amount of resources just to cope with slow subscribers would be a waste, and decides to choose the courtesy wait (d), which may harm some subscribers but just for a reduced time. The marketing team decides to call this approach Reactive Magazine Publishing because it “adapts to their readers”. The worker raising the analysis above became the employee of the month.

Note that I chose to go for that solution (as a greedy Magazine Publisher) because if we use a virtually-infinite buffer it’s difficult to understand all the concepts of the Reactive approach, and the code would look plain and simple, being hard to compare it with alternate solutions. Let’s see the code.

Get the book Practical Software Architecture

Java 9 Flow sample code

All the code in this post is available on GitHub: Java 9 Flow - Reactive. If you find it useful, please give it a star!

A simple Subscriber in full-control of the Flow

Let’s start with the Subscriber. The MagazineSubscriber class implements Flow.Subscriber<Integer> (they will receive just a number, but let’s keep imagining that it’s a really nice magazine about your preferred topic).

package com.thepracticaldeveloper;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MagazineSubscriber implements Flow.Subscriber<Integer> {

  public static final String JACK = "Jack";
  public static final String PETE = "Pete";

  private static final Logger log = LoggerFactory.
    getLogger(MagazineSubscriber.class);

  private final long sleepTime;
  private final String subscriberName;
  private Flow.Subscription subscription;
  private int nextMagazineExpected;
  private int totalRead;

  MagazineSubscriber(final long sleepTime, final String subscriberName) {
    this.sleepTime = sleepTime;
    this.subscriberName = subscriberName;
    this.nextMagazineExpected = 1;
    this.totalRead = 0;
  }

  @Override
  public void onSubscribe(final Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(final Integer magazineNumber) {
    if (magazineNumber != nextMagazineExpected) {
      IntStream.range(nextMagazineExpected, magazineNumber).forEach(
        (msgNumber) ->
          log("Oh no! I missed the magazine " + msgNumber)
      );
      // Catch up with the number to keep tracking missing ones
      nextMagazineExpected = magazineNumber;
    }
    log("Great! I got a new magazine: " + magazineNumber);
    takeSomeRest();
    nextMagazineExpected++;
    totalRead++;

    log("I'll get another magazine now, next one should be: " +
      nextMagazineExpected);
    subscription.request(1);
  }

  @Override
  public void onError(final Throwable throwable) {
    log("Oops I got an error from the Publisher: " + throwable.getMessage());
  }

  @Override
  public void onComplete() {
    log("Finally! I completed the subscription, I got in total " +
      totalRead + " magazines.");
  }

  private void log(final String logMessage) {
    log.info("<=========== [" + subscriberName + "] : " + logMessage);
  }

  public String getSubscriberName() {
    return subscriberName;
  }

  private void takeSomeRest() {
    try {
      Thread.sleep(sleepTime);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

This class implements the required interface methods:

  • onSubscribe(subcription). The Publisher will invoke this method when getting a new Subscriber. Normally you want to save the subscription since it will be used later to send signals to the Publisher: request more items, or cancel the subscription. It's also common to use it right away to request the first item, as we do here.
  • onNext(magazineNumber). This method will be invoked whenever a new item is received. In our case, we'll also follow a typical scenario and, besides processing that item, we'll request a new one. However, in between those, we include a sleep time that is configurable when creating the Subscriber. This way we can try different scenarios and see what happens when subscribers don't behave properly. The extra logic is just to log the missing magazines in case of drops: we know the sequence in advance so the subscriber can detect when that happens.
  • onError(throwable). This is called by the Publisher to tell the Subscriber that something went wrong. In our implementation, we just log the message since that will happen when the Publisher drops an item.
  • onComplete(). No surprises here: this one is invoked when the Publisher does not have more items to send, so the Subscription is completed.

Going Reactive with Java 9 SubmissionPublisher

To build the publisher we’ll use the Java 9’s SubmissionPublisher class. As stated in the Javadoc, it implements the principles of the Reactive Streams initiative for a Publisher that may block when subscribers are slow, or may also drop items if needed. Let’s first see the code and then dive into details:

package com.thepracticaldeveloper;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReactiveFlowApp {

  private static final int NUMBER_OF_MAGAZINES = 20;
  private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
  private static final Logger log =
    LoggerFactory.getLogger(ReactiveFlowApp.class);

  public static void main(String[] args) throws Exception {
    final ReactiveFlowApp app = new ReactiveFlowApp();

    log.info("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
      "important in this case.");
    app.magazineDeliveryExample(100L, 100L, 8);

    log.info("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
      "size on the publisher's side to keep all items until they're picked up");
    app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

    log.info("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
      "size on the publisher's side so it's important to keep the slow " +
      "subscriber under control");
    app.magazineDeliveryExample(1000L, 3000L, 8);

  }

  void magazineDeliveryExample(final long sleepTimeJack,
                               final long sleepTimePete,
                               final int maxStorageInPO) throws Exception {
    final SubmissionPublisher<Integer> publisher =
      new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);

    final MagazineSubscriber jack = new MagazineSubscriber(
      sleepTimeJack,
      MagazineSubscriber.JACK
    );
    final MagazineSubscriber pete = new MagazineSubscriber(
      sleepTimePete,
      MagazineSubscriber.PETE
    );

    publisher.subscribe(jack);
    publisher.subscribe(pete);

    log.info("Printing 20 magazines per subscriber, with room in publisher for "
      + maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
      " seconds to consume each magazine.");
    IntStream.rangeClosed(1, 20).forEach((number) -> {
      log.info("Offering magazine " + number + " to consumers");
      final int lag = publisher.offer(
        number,
        MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
        TimeUnit.SECONDS,
        (subscriber, msg) -> {
          subscriber.onError(
            new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
              .getSubscriberName() + "! You are too slow getting magazines" +
              " and we don't have more space for them! " +
              "I'll drop your magazine: " + msg));
          return false; // don't retry, we don't believe in second opportunities
        });
      if (lag < 0) {
        log("Dropping " + -lag + " magazines");
      } else {
        log("The slowest consumer has " + lag +
          " magazines in total to be picked up");
      }
    });

    // Blocks until all subscribers are done (this part could be improved
    // with latches, but this way we keep it simple)
    while (publisher.estimateMaximumLag() > 0) {
      Thread.sleep(500L);
    }

    // Closes the publisher, calling the onComplete() method on every subscriber
    publisher.close();
    // give some time to the slowest consumer to wake up and notice
    // that it's completed
    Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
  }

  private static void log(final String message) {
    log.info("===========> " + message);
  }

}

As you see, this is also our main class. This is to keep the project as simple as possible for this guide. The main logic is in the method magazineDeliveryExample, which allows us to work with two different sleep times for our two different subscribers, and also set the buffer size on the publisher’s side (maxStorageInPostOffice).

Then we follow these steps:

  1. Create a SubmissionPublisher with a standard thread pool (each subscriber owns a thread) and the selected buffer size (this will be rounded up to a power of 2 in case it's not).
  2. Create two subscribers with different sleep times as passed as arguments, and different names to easily recognize them in logs.
  3. Using as a data source a stream of 20 numbers to model our Magazine Printer, we call the method offer with several parameters:
    1. The item to make available to subscribers.
    2. The maximum amount of time to wait for each subscriber to pick that item (arguments two and three).
    3. A handler for us to control what happens if a given subscriber doesn't get the item. In our case, we send an error to that subscriber and, by returning false, we indicate we don't want to retry and wait again.
  4. When a drop happens, the offer method returns a negative number. Otherwise, it returns the estimated maximum number of items pending to be collected by the slowest subscriber (lag). We just log that number to the console.
  5. The last part of the application cycle is just to avoid the main thread to terminate too early. In a real application, we better control that with latches but I used here the own use-case logic to first wait for the publisher to not have anything in the buffers, and then wait for the slowest subscriber to receive the onComplete signal, which happens implicitly inside the close() method.

The main() method calls that logic with three different scenarios that model the real-case situation explained above:

  1. The subscribers are so fast that there are no problems related to buffering.
  2. One of the subscribers is very slow so that buffer starts getting full. However, the buffer is big enough to hold all the items so the subscriber doesn't experience drops.
  3. One of the subscribers is very slow and the buffer is not big enough to hold all the items. In this case, the handler gets invoked in several times and the subscriber doesn't receive all the items.

Note that you have other combinations to explore here. You can try, for example, to make the offer method behave similar to submit (see section below) if you set the constant MAX_SECONDS_TO_WAIT_WHEN_NO_SPACE to a very high number. Or you can see what happens when both subscribers are slow (spoiler alert: more drops).

Running the app

If you now run the application code from your IDE or by packaging it and run it from the command line, you’ll see a colored console log indicating what’s going on for the three scenarios - I didn’t tell you, but there is an additional class in the GitHub repository that does the coloring part (ColorConsoleAppender), your eyes will appreciate it.

Get the book Practical Software Architecture

You can read the logs and see how everything works as expected:

  • The first iteration goes well: no drops and both subscribers are fast so it completes also fast.
  • The second iteration takes longer to complete because of the slow subscriber, but this doesn't lose any magazine.
  • The third iteration doesn't go so well: timeouts are expired on several occasions so the slow subscriber ends up receiving some errors from the publisher and subsequent drops. That reader will sadly lose some great articles from the Publisher.

Dropping or Blocking: choose wisely

Dropping Items

We covered the most flexible case when using a SubmissionPublisher and offered the items to our subscribers with a timeout and a handler for the drops. We could have also gone for a simpler method of that class: submit, which only accepts an argument: the item value. However, if we do that, the Publisher is not deciding what to do. If any of the buffers of its subscribers is full, the Publisher will block until there is space, impacting all the other subscribers.

As always in Software Development, there is no black and white approach about how to use the Flow API in general and SubmissionPublisher in particular. But here is some advice:

  • If you know in advance an estimate of the items you'll publish and the number of subscribers you may have, you can analyze the possibility of dimensioning your buffers to have a size greater than the maximum number of items.
  • Alternatively, you can create a wrapper class on top of SubmissionPublisher and implement a rule in which only one Subscriber is allowed per Publisher, therefore avoid interferences between subscribers. Although, in some situations, this might not make sense if the Publisher and the data source are just one thing, or limited in number (think of a connection pool to a database, for instance).
  • In case you're in control of the Subscribers, you can make them smarter and more supportive. They could decide to cancel the subscription if they detect errors or a high lag.

Those grey areas may vary a lot and that’s why having a flexible solution like reactive APIs help in these cases. You should list your requirements and check if you go for a plain solution, smarter publishers, smarter subscribers or a combination of both.

Conclusions

Within this guide, we saw some basic principles of reactive programming and we dived into a sample application with an use case that I hope it helps you to grasp the concepts better than generic and meaningless examples. The concept of hybrid pull/push mechanism should be clearer now, and the word Flow and the importance of the Subscriber being in control of it should be also important ideas I hope you got from this guide.

You shouldn’t be in a rush of applying this pattern in a real project, but it’s great when you know it since it might happen in future that you find a problem for which Reactive Programming fits perfectly: then you’ll gain a lot. Code projects in which Functional and Reactive programming are nicely combined look great to read (thus understand/maintain) and work with. They also perform better; you learned how the hybrid pull/push technique brings the benefit of reducing consumed resources.

I recommend you to clone the GitHub repository or try your own project from scratch. It’s when you code it and play with the possibilities when you learn the most of it.

Did you enjoy this guide? 

  • If you like the way concepts are explained here and want to learn microservices with Spring Boot, then go for my book, you'll enjoy it as well.
  • I wake up early in the morning to write these articles and guides, so you can also buy me a coffee and that will be greatly appreciated.
  • If you want to know more about Reactive Programming applied to Web, check this other guide: Full Reactive Stack
Moisés Macero's Picture

About Moisés Macero

Software Developer, Architect, and Author.
Are you interested in my workshops?

Málaga, Spain https://thepracticaldeveloper.com

Comments