Block Image

In this article we will look at Spring 5's new Reactive stack for creating REST services, Spring WebFlux.

What is a reactive system

From Wikipedia:

"Reactive programming is a programming paradigm oriented around data flows and the propagation of change."

The Reactive Manifesto describes the characteristics that a reactive system must have:

  • Responsive. The system must provide fast responses and be reliable.
  • Resilient. The system must be able to contain failures through isolation. If something goes wrong, the system must react without freezing.
  • Elastic. The system must support predictive algorithms to avoid bottlenecks, increasing and decreasing resources depending on how much input data is received.
  • Message driven. The system must be based on the message-driven paradigm, as components must be able to communicate with each other via asynchronous messages.

In Java there is the Reactive Streams specification that defines standards for reactive systems.
There are also various implementations of this specification, such as RxJava, Project Reactor, Vert.X.

The interfaces provided by this specification are:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> var1);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    void onNext(T var1);

    void onError(Throwable var1);

    void onComplete();
}

public interface Subscription {
    void request(long var1);

    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The interfaces show that the Observer pattern is applied.
In particular:

  • The Publisher publishes elements and provides the subscribe method to allow Subscribers to listen.
  • The Subscriber subscribes to the Publisher and receives the data via the onNext method.
  • A Subscription represents the link between the Publisher and the Subscriber. By means of the cancel method, the subscriber can cancel the subscription and with the request method it can implement the backpressure mechanism.
  • A Processor is both a Publisher and a Subscriber, so it can both listen and publish elements.

Spring WebFlux uses the Project Reactor library and uses Netty as a non-blocking reactive server by default.

Project Reactor: Mono e Flux

Project Reactor is the non-blocking reactive programming framework used by Spring.
It provides two APIs that implement the Publisher interface of the Reactive Streams specification:

  • Mono**: to manage 0 or 1 asynchronous element.
  • Flux**: to manage 0 or N asynchronous elements.

For more info on Project Reactor, how reactive programming works and the advantages over using Callbacks and Futures, you can visit
https://projectreactor.io/docs/core/release/reference/.

Spring WebFlux allows you to create REST services in two modes:

  • functional mode (using RouterFunction and HandlerFunction)
  • traditional mode (annotation based, using annotations like @RestController, @RequestMapping)

Let's now create a simple REST Web Service with Spring WebFlux using Reactive Mongo!

Prerequisites

  1. Have installed a JDK (we will use version 17, but a version later than 7 is also acceptable).

First step: go to the Spring Initializr website

This site will create a skeleton of a Spring Boot app for us with everything we need (just look for the dependencies we need in the 'Dependencies' section). Click on 'ADD DEPENDENCIES' and add the dependencies shown in the image.

Block Image

  • The dependency of Spring Reactive Web is for using WebFlux.
  • The Spring Data Reactive MongoDB dependency is for using the reactive repository for MongoDB.
  • Validation_ is for using validation via javax annotations using Hibernate Validator.
  • Lombok allows us with annotations to auto-generate the methods getters, setters, equals, hashcode and toString.
  • Embedded MongoDB Database_ allows us to use an embedded MongoDB (a bit like we do with H2).

Click on 'Generate'. The project zip will be downloaded. We replace the scope 'test' with 'runtime' for the de.flapdoodle.embed.mongo library as we will be using embedded MongoDb for development and not just testing.
Finally, add, in the application.properties file, the version of MongoDb you want to use:
spring.mongodb.embedded.version=5.0.0

Second step: create a User domain class that will map the same Documents to MongoDB

@Document
@Data
@NoArgsConstructor
public class User {

    @Id
    private String id;

    private String name;

    @NotBlank
    private String surname;

    public User(String name, String surname) {
        this.name = name;
        this.surname = surname;
    }
}

The @Document annotation marks the class as a persistent MongoDB object.
@Data and @NoArgsConstructor are Lombok annotations.

Third step: creating a Reactive Repository for User

public interface UserMongoRepository extends ReactiveMongoRepository<User, String> {
}

Like all Spring repositories, this approach provides methods such as findAll, findById, save, delete out of the box for the User domain class.

Fourth step: let's write a general handler class to handle validation

@Component
@RequiredArgsConstructor
public class ValidatorHandler {

    private final Validator validator;

    public <T> void validate(T o) {
        Set<ConstraintViolation<T>> validate = validator.validate(o);
        if(! validate.isEmpty()) {
            ConstraintViolation<T> violation = validate.stream().iterator().next();
            throw new ServerWebInputException(violation.toString());
        }
    }
}

If validation detects at least one violation, the ServerWebInputException is thrown which extends ResponseStatusException and thus also returns 400 badRequests as HTTP Status.

Fifth step: let's write the handler for the endpoints that handle User

@Component
@RequiredArgsConstructor
public class UserHandler {


    private static final Logger LOGGER = LoggerFactory.getLogger(UserHandler.class);

    private final UserMongoRepository userMongoRepository;

    private final ValidatorHandler validatorHandler;


    public Mono<ServerResponse> findAll(ServerRequest request) {
        return userMongoRepository.findAll()
                .collectList()
                .flatMap(users -> {
                    if (users.isEmpty()) {
                        return ServerResponse.noContent().build();
                    }
                    return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(users));
                });
    }

    public Mono<ServerResponse> save(ServerRequest request) {
        return request.bodyToMono(User.class)
                .doOnNext(validatorHandler::validate)
                .flatMap(userMongoRepository::save)
                .doOnSuccess(userSaved -> LOGGER.info("User saved with id: " + userSaved.getId()))
                .doOnError(e -> LOGGER.error("Error in saveUser method", e))
                .map(userSaved -> UriComponentsBuilder.fromPath(("/{id}")).buildAndExpand(userSaved.getId()).toUri())
                .flatMap(uri -> ServerResponse.created(uri).build());
    }
}

Let's analyse the findAll:

  1. We call the findAll of the repository which returns a Flux.
  2. With the collectList() method we transform the Flux into Mono<List>.
  3. With the flatMap() method, we finally transform the Mono object into ServerResponse (the equivalent of ResponseEntity for Spring WebFlux), which returns 200 OK or 204 NoContent depending on whether the list is non-empty or empty.

Let's analyse the save method:

  1. With request.bodyToMono(User.class) we extrapolate the body sent by the request and map it to the User class.
  2. With doOnNext we carry out the validation of the User object in the next step.
  3. With flatMap we transform the User object just created and validated, into the User object created by the repository, which

represents the object saved in the DB and will therefore have the id set to value. 4. With map we transform the object just saved into a URI object that will contain the User id, so, as to insert it in the Location field of the Location field of the Response Header. 5. With flatMap we transform the URI into a ServerResponse object, which returns 201 Created with the Location field set to value. 6. With doOnSuccess, which is called if there are no errors at the end of the flow, we log the id of the saved User. 7. With doOnError, which is called if the flow failed, we log the possible exception.

At the end of the article you will find the GitHub link to the project, including the findById and delete methods.

Sixth step: writing endopints in a functional way

Before writing the endpoints, let us take a look at the RounterFunction interface:

@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
    Mono<HandlerFunction<T>> route(ServerRequest var1);

    default RouterFunction<T> and(RouterFunction<T> other) {
        return new SameComposedRouterFunction(this, other);
    }
    // other default methods
}

As we can see, it is a functional interface (an interface with only one abstract method). We must therefore implement the route method. ServerRequest on the other hand represents the HTTP request that is made by a client.

We can use Spring's RouterFunctions class which has the static route method:

public abstract class RouterFunctions {

    public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
        return new RouterFunctions.DefaultRouterFunction(predicate, handlerFunction);
    }
//other methods
}

RequestPredicate and HandlerFunction are in turn two other functional interfaces:

@FunctionalInterface
public interface RequestPredicate {
    boolean test(ServerRequest var1);
 //other default methods
}

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
    Mono<T> handle(ServerRequest var1);
}

Let's see now how to implement endpoints with these interfaces:

@Configuration
public class UserRouter {
    
    @Bean
    public RouterFunction<ServerResponse> findAllRouter(UserHandler userHandler) {
        return route(GET("/users")
                .and(accept(MediaType.APPLICATION_JSON)), userHandler::findAll);
    }

    @Bean
    public RouterFunction<ServerResponse> save(UserHandler userHandler) {
        return route(POST("/users")
                .or(PUT("/users"))
                .and(accept(MediaType.APPLICATION_JSON)), userHandler::save);
    }
    
}

As we can see, it is a trivial class annotated with @Configuration.
The first method handles the /user endpoint with GET verb and the second handles the /user endpoint with POST and PUT verbs.

Sixth step B: let's see the same endpoints written in the traditional way

@RestController
@RequestMapping("/users")
@RequiredArgsConstructor
public class UserResource {

    private final UserMongoRepository userMongoRepository;

    @GetMapping
    public Mono<ResponseEntity<List<User>>> findAll() {
        return userMongoRepository.findAll()
                .collectList()
                .map(users -> {
                    if(users.isEmpty()) {
                        return ResponseEntity.noContent().build();
                    }
                    return ResponseEntity.ok(users);
                });
    }

    @RequestMapping(method = {RequestMethod.POST, RequestMethod.PUT})
    public Mono<ResponseEntity<User>> save(@Valid @RequestBody User user) {
        return userMongoRepository.save(user)
                .map(userSaved -> UriComponentsBuilder.fromPath(("/{id}")).buildAndExpand(userSaved.getId()).toUri())
                .map(uri -> ResponseEntity.created(uri).build());
    }

}

Note that the implementations of the findAll and save methods are very similar to those of the UserHandler class.

The noticeable advantage is that in the traditional method, validation is done automatically with the the @Valid annotation (just like when using Spring MVC).

You could also centralize validation handling, and in general, exception handling, by creating an ExceptionHandler like this:

@RestControllerAdvice
@Slf4j
public class ExceptionHandlerConfig {

    //handle the Exceptions as you want

    @ExceptionHandler(WebExchangeBindException.class)
    public void handleException(WebExchangeBindException e) {
        log.error("Error Validation", e);
        throw e;
    }
}

Seventh step: create the configuration class to use MongoDB embedded

@Configuration
@Profile("dev")
@RequiredArgsConstructor
public class MongoConfig extends AbstractReactiveMongoConfiguration {

   private final Environment env;

   @Override
   protected String getDatabaseName() {
      return "users";
   }

   @Override
   @Bean
   @DependsOn("embeddedMongoServer")
   public MongoClient reactiveMongoClient() {
      var port = env.getProperty("local.mongo.port", Integer.class);
      return MongoClients.create(String.format("mongodb://localhost:%d", port));
   }

   @Bean
   public CommandLineRunner insertData(UserMongoRepository userMongoRepository) {
      return args -> {
         userMongoRepository.save(new User("Vincenzo", "Racca")).subscribe();
         userMongoRepository.save(new User("Mario", "Rossi")).subscribe();
         userMongoRepository.save(new User("Gennaro", "Esposito")).subscribe();
         userMongoRepository.save(new User("Diego", "della Lega")).subscribe();
      };
   }
}

We also insert five records in the db at startup.

Let's test the endpoints with a REST client such as Postman

Let's test the endpoints by running the Spring main class and making a GET call to
localhost:8080/users.
You'll get a response similar to this:

[
    {
        "id": "60340ab46d597e20886009b8",
        "name": "Mario",
        "surname": "Rossi"
    },
    {
        "id": "60340ab46d597e20886009b7",
        "name": "Vincenzo",
        "surname": "Racca"
    },
    {
        "id": "60340ab46d597e20886009b9",
        "name": "Gennaro",
        "surname": "Esposito"
    },
    {
        "id": "60340ab46d597e20886009ba",
        "name": "Diego",
        "surname": "della Lega"
    }
]

Now let's try to insert a new User by invoking a POST to localhost:8080/users with the request body:

{
    "name": "Beppe",
    "surname": ""
}

We will have as HTTP response status 400 BadRequest and the following response json:

{
    "timestamp": "2021-02-22T19:54:10.188+00:00",
    "path": "/users",
    "status": 400,
    "error": "Bad Request",
    "message": "ConstraintViolationImpl{interpolatedMessage='non deve essere spazio', propertyPath=surname, rootBeanClass=class com.vincenzoracca.webflux.domains.User, messageTemplate='{javax.validation.constraints.NotBlank.message}'}",
    "requestId": "9c502952-5"
}

On the application side we will have this logging:

ERROR 5280 --- [ctor-http-nio-4] c.v.webflux.handlers.UserHandler         : Error in saveUser method

org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST "ConstraintViolationImpl{interpolatedMessage='non deve essere spazio', propertyPath=surname, rootBeanClass=class com.vincenzoracca.webflux.domains.User, messageTemplate='{javax.validation.constraints.NotBlank.message}'}"
	at com.vincenzoracca.webflux.handlers.ValidatorHandler.validate(ValidatorHandler.java:21) ~[classes/:na]

This logging is given by the line:

.doOnError(e -> LOGGER.error("Error in saveUser method", e))

of the save method of the UserHandler class. Now let's insert a valid request:

{
    "name": "Beppe",
    "surname": "De Rossi"
}

We will have as HTTP Response Status 201 Created, with an empty response body and in the response header we will have the Location field in the Response Header valued with the id of the newly created User.

Testing with WebTestClient

Spring WebFlux also offers a non-blocking reactive REST client: WebClient.
As with its non-reactive counterpart, RestTemplate, this client also has a class that facilitates the testing phase, WebTestClient. Let's see how to use it:

@SpringBootTest
class SpringWebFluxApplicationTests {

    @Autowired
    private UserRouter userRouter;

    @Autowired
    private UserHandler userHandler;

    @MockBean
    private UserMongoRepository userMongoRepository;

    @Test
    public void findAllTest() {

        WebTestClient client = WebTestClient
                .bindToRouterFunction(userRouter.findAllRouter(userHandler))
                .build();

        List<User> users = Arrays.asList(new User("Mario","Rossi"),
                new User("Filippo", "Bianchi"));

        Flux<User> flux = Flux.fromIterable(users);
        given(userMongoRepository.findAll())
                .willReturn(flux);

        client.get().uri("/users")
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(User.class)
                .isEqualTo(users);
    }
    
    @Test
    public void saveTest() {

        WebTestClient client = WebTestClient
                .bindToRouterFunction(userRouter.save(userHandler))
                .build();

        User user = new User("Clark","Kent");
        user.setId("efgt-fght");

        Mono<User> mono = Mono.just(user);
        given(userMongoRepository.save(user))
                .willReturn(mono);

        client.post().uri("/users")
                .accept(MediaType.APPLICATION_JSON)
                .body(mono, User.class)
                .exchange()
                .expectStatus().isCreated()
                .expectHeader()
                .location("/efgt-fght");
    }
}

Two things to note:

  • UserMongoRepository is not injected from the Spring context, but is mocked. At this stage we don't need to go to a database.

  • The WebTestClient class is created from a RouterFunction, so we inject the real UserRouter and UserHandler.

Conclusions

We have seen how to create a simple REST Web Service with the new Reactive Spring WebFlux module and Reactive MongoDB.
On the internet you can find very interesting articles about the performance of WebFlux compared to the use of traditional blocking servlets like Tomcat.

You can find the complete project on my github at this link: Spring WebFlux.

Articles about Spring: Spring

Recommended books about Spring:

  • Pro Spring 5 (Spring from scratch a hero): https://amzn.to/3KvfWWO
  • Pivotal Certified Professional Core Spring 5 Developer Exam: A Study Guide Using Spring Framework 5 (for Spring certification): https://amzn.to/3KxbJSC
  • Pro Spring Boot 2: An Authoritative Guide to Building Microservices, Web and Enterprise Applications, and Best Practices (Spring Boot of the detail): https://amzn.to/3TrIZic