Skip to main content

Getting Your Application Started With WebFlux

Webflux image (1)

This is the 2nd article in our series about Spring Webflux. In the first part, we discussed Reactive and Functional programming. In this article we'll focus on the Spring Webflux framework and what are the layers of your application. We assume you understand how functional programming works, so if you missed the previous article it's worth a review.

Getting Started with WebFlux

Webflux is the Spring implementation of reactive programming for web applications. It's fully non-blocking and supports Reactive Streams, adopted on Java 9 and which plays an important role for interoperability.

Reactor is the required library used by Spring Webflux and provides the Mono and Flux API types. A Flux is the equivalent of an RxJava Observable, capable of emitting 0 or more items, and then optionally either completing or erroring. A Mono on the other hand can emit at most once. 

Mono's and Flux's are Publishers that act as providers of a potentially unbounded sequence of elements that are published based on requests from subscribers. A Publisher services as the source of a data reactive stream and it emits elements sequentially to its subscribers.

Mono: A Publisher that emits 0 or 1 element.

// 1. Mono with a single value
Mono<String> greetingMono = Mono.just("Hello, WebFlux!");

// 2. Mono that completes without emitting any value
Mono<Void> emptyMono = Mono.empty();

// 3. Mono that emits an error
Mono<String> errorMono = Mono.error(new RuntimeException("Something went wrong!"));

Flux: A Publisher that emits 0 to N elements

Flux<Integer> numberFlux = Flux.just(1, 2, 3, 4, 5);

// 2. Flux that completes without emitting any elements
Flux<String> emptyFlux = Flux.empty();

// 3. Flux that emits an error
Flux<Integer> errorFlux = Flux.error(new RuntimeException("An error occurred!"));

Once the stream of data is created, it needs to be subscribed to so it starts emitting data elements that were declared by the publisher. Subscribers are the consumers or receivers of data emitted by a publisher. They define how the data from the publisher should be processed. Subscribers register their interest in receiving data from a publisher and define how to handle the emitted elements. When a publisher emits an element, the subscriber's corresponding method is invoked to process that specific element. The data won’t flow or be processed until the subscribe() method is called.

Reactor provides a rich set of operators for working with Mono and Flux. Here's a list of some commonly used operators with brief descriptions:

Creation Operators:

  • just(T... values): Creates a Mono that emits a single element for each provided value.
  • empty(): Creates a Mono or Flux that completes immediately without emitting any elements.
  • from(Iterable<T> iterable): Creates a Flux that emits elements from the provided iterable.
  • fromCallable(Callable<T> callable): Creates a Mono that emits the result of calling the provided callable function.
  • defer(Supplier<Mono<T>> supplier): Creates a Mono that defers the creation of the actual Mono until the subscription occurs.

Transformation Operators:

  • map(Function<T, R> mapper): Applies a function to each element emitted by the Mono/Flux and emits the transformed results.
  • filter(Predicate<T> predicate): Emits only those elements from the Mono/Flux that satisfy the provided predicate.
  • flatMap(Function<T, Publisher<R>> mapper): Transforms each element emitted by the Mono/Flux into a new Publisher (Mono or Flux) and emits elements from the resulting Publishers in a flattened sequence.
  • reduce(BiFunction<R, T, R> reducer): Applies a reduction function to accumulate all emitted elements into a single value.
  • take(long n): Limits the number of elements emitted by the Mono/Flux to a specified number.
  • takeWhile(Predicate<T> predicate): Emits elements from the Mono/Flux until the provided predicate returns false.

Combination Operators:

  • concatWith(Publisher<? extends T> other): Concatenates two Publishers, emitting elements first from the current and then from the provided other Publisher.
  • mergeWith(Publisher<? extends T> other): Merges elements from two Publishers into a single stream, interleaving elements from both sources non-deterministically.
  • zipWith(Publisher<? extends R> other, BiFunction<T, R, S> zipper): Combines elements from two Publishers pairwise using a provided zipper function and emits the resulting combinations.

Error Handling Operators:

  • onErrorResume(Function<Throwable, Publisher<? extends T>> resumeFunction): When an error occurs, attempts to recover by subscribing to a new Publisher provided by the resumeFunction.
  • onErrorReturn(Function<Throwable, T> returnFunction): When an error occurs, replaces the error with a single element returned by the provided function.

Backpressure Operators:

  • buffer(int size): Groups emitted elements into fixed-size buffers and emits the buffers as new elements.
  • request(long n): Requests a specific number of elements from the upstream publisher, allowing control over the flow of data.

Terminal Operators:

  • subscribe(Consumer<? super T> consumer): Subscribes to the Mono/Flux, triggering emission of elements to the provided consumer.
  • block() (Mono only): Blocks the current thread until the Mono completes and returns the emitted value (or throws an error).

Maven Dependency

The most important dependencies are spring-boot-starter-webflux and spring-boot-starter-parent.

<!-- pom.xml -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
<dependency>
  <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependencies>

Functional Web Framework

The Spring WebFlux framework includes a Functional Web Framework that provides an alternative programming model for building reactive web applications. Unlike the traditional annotation-based approach, this framework utilizes functions to handle incoming requests and define how responses are generated. This functional style, built on top of reactive streams principles, promotes:

  • Declarative style: Focuses on "what" needs to be done rather than "how" (implementation details).
  • Immutability: Encourages the use of immutable objects, leading to potential benefits in thread safety and easier reasoning about program behavior.
  • Composition: Enables chaining of functions together to create complex processing pipelines effectively.

Web Layer

The web layer is where we have most in common with the Spring MVC framework. We can use the same annotations for @RestController,  @RequestMapping and requests type. The major difference is the type of the return from the request function which needs to be a Mono or a Flux.

@RestController
@RequestMapping("/greetings")
public class GreetingController {

  @GetMapping("/{name}")
  public Mono<String> greet(@PathVariable String name) {
    return Mono.just("Hello, " + name + "!");
  }
}

Another approach for writing endpoints, and probably more common when working with Webflux it's to use Functional Routing. This was introduced on Spring Framework 5 as a new way to define how incoming requests are mapped to the appropriate handler functions in web applications. Unlike traditional annotation-based approaches, Functional Routing utilizes functions and lambdas to achieve this mapping. This approach aims to simplify request handling logic, promote code readability, and potentially improve maintainability by separating routing configuration from handler function implementation.

If we apply this concept to the greeting example presented previously, we'll be creating a Handler function as in the example below. The greet function takes a ServerRequest object as input, providing access to request details, and after running through it's logic it returns a ServerResponse.

public class GreetingHandler {

  public Mono<ServerResponse> greet(ServerRequest request) {
    String name = request.pathVariable("name");
    if (name.isEmpty()) {
      return ServerResponse.badRequest().bodyValue("Name cannot be empty!");
    } else {
      return ServerResponse.ok().bodyValue(Mono.just("Hello, " + name + "!"));
    }
  }
}

Then our controller class is replaced by a Router class, being responsible for routing the incoming requests to the correct handler.

@Configuration
public class GreetingRouter {

  @Bean
  public RouterFunction<ServerResponse> greetingRoutes(GreetingHandler handler) {
    return RouterFunctions.route()
        .GET("/greetings/{name}", handler::greet)
        .build();
  }
}

If we replicate that same example to a context with more than 1 request type, we can see the improvements on the code reliability when 1 single function it's responsible to take all the incoming requests and redirect them to the correct handler.

@Configuration
public class ItemRouter {

  @Bean
  public RouterFunction<ServerResponse> itemRoutes(ItemHandler handler) {
    return RouterFunctions.route()
        .GET("/items", handler::getItems)
        .POST("/items", handler::createItem)
        .build();
  }
}

It's important to remark that our routing class it's now defined as a @Configuration, differently from what was used previously by our Spring MVC. Furthermore, we are stacking the requests on the same router. Router functions are evaluated in order: if the first route does not match, the second is evaluated, and so on. Therefore, it makes sense to declare more specific routes before general ones.
Persistence Layer

As we've discussed previously, a database call it's a blocking request, meaning that in a blocking system while the query is executed that thread will not execute anything else. We already know that the goal of the Webflux framework is to work with non blocking calls.  For that we'll be also handling database requests using a different library, R2DBC. Which enables asynchronous communication with databases, allowing the application to handle multiple requests concurrently without blocking threads. This promotes better performance and scalability, particularly when dealing with high-volume or real-time data access needs within your WebFlux application.

Spring simplifies connecting to your database in WebFlux applications with R2DBC. Instead of managing connections directly, you interact with a ConnectionFactory. This factory, as defined by the R2DBC spec, acts as a central access point for database drivers. It handles connection pooling and transaction management behind the scenes, keeping your code clean and free from these complexities. As a developer, you don't need to worry about the low-level details of establishing a database connection. This is typically configured by the administrator during setup. While you might handle both development and testing (where you'll interact with the ConnectionFactory), the specifics of the production data source configuration are usually transparent to you.

@Configuration
public class MyDatabaseConfig extends AbstractR2dbcConfiguration {

  @Value("${spring.datasource.name}")
 private String database;

 @Value("${spring.datasource.host}")
 private String host;

 @Value("${spring.datasource.port}")
 private int port;

 @Value("${spring.datasource.username}")
 private String username;

 @Value("${spring.datasource.password}")
 private String password;

  @Override
  @Bean
  public ConnectionFactory connectionFactory() {
    return ConnectionFactories.get(ConnectionFactoryOptions.builder()
                .option(DRIVER, "pool")
                .option(PROTOCOL, "postgresql")
   .option(HOST, host)
                .option(PORT, port)
                .option(USER, username)
                .option(PASSWORD, password)
                .option(DATABASE, database)
                .option(Option.valueOf("maxSize"), "10")
                .build());

  }
}

Spring Data R2DBC simplifies interacting with relational databases in a reactive manner within your Spring WebFlux applications. Here, we'll take a look at a basic example using the ReactiveSortingRepository interface. This interface provides pre-built functionality for common CRUD operations (create, read, update, delete) and sorting capabilities, streamlining data access and manipulation in your reactive data layer.

public interface ItemRepository extends ReactiveSortingRepository<Item, Long> {
// Optional: Add custom sorting methods using findBy...OrderBy
}

This interface extends ReactiveSortingRepository<T, ID>, which is part of Spring Data R2DBC. It defines the entity type (Item) and its identifier type (Long). By extending ReactiveSortingRepository, you gain access to basic CRUD operations and sorting functionality.

In our next article, we'll dive deeper into how Webflux works and understand how to ensure that our application is taking full advantage of asynchronous programming.

Post by Ricardo Corassa
May 7, 2024 7:00:00 AM

Comments

©Copyright 2024 Ippon USA. All Rights Reserved.   |   Terms and Conditions   |   Privacy Policy   |   Website by Skol Marketing