Spring WebFlux

Project Reactor docs:
https://projectreactor.io/docs/core/release/reference/

Spring Webflux:
https://docs.spring.io/spring-framework/docs/5.1.2.RELEASE/spring-framework-reference/web-reactive.html

Kotlin Webflux:
https://www.kotlindevelopment.com/kotlin-webflux/
https://kotlinexpertise.com/spring-webflux-with-kotlin-reactive-web/

https://backpaper0.github.io/ghosts/reactive/#1

サンプル

ソース

git clone git@github.com:hako1912/hello-spring-webflux.git

Dockerイメージ

docker pull hako1912/hello-spring-webflux

build.gradle

build.gradleのひな型。

plugins {
    id 'org.springframework.boot' version '2.1.3.RELEASE'
    id 'io.spring.dependency-management' version '1.0.6.RELEASE'
    id 'org.jetbrains.kotlin.jvm' version '1.3.21'
    id 'org.jetbrains.kotlin.plugin.spring' version "1.3.21"
    id "org.hidetake.swagger.generator" version "2.14.0"
    id 'org.asciidoctor.convert' version "1.5.7"
    id 'com.google.cloud.tools.jib' version '1.0.2'
    id 'org.jlleitschuh.gradle.ktlint' version '7.1.0'
}

group = 'com.example'
version = rootProject.file('.version').text.trim()

repositories {
    jcenter()
    mavenCentral()
}

// spring-boot-dependenciesのjunitのバージョンを上書きする
ext['junit-jupiter.version'] = '5.4.0'
dependencies {
    def kotlinLogginVersion = '1.6.25'
    def kotlinArrowVersion = '0.7.2'
    def kotlinMockitoVersion = "2.1.0"
    def wiremockVersion = '2.19.0'
    def swaggerUiVersion = '3.10.0'

    // spring
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'

    // kotlin
    implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'
    implementation 'org.jetbrains.kotlin:kotlin-reflect'
    implementation "io.arrow-kt:arrow-core:$kotlinArrowVersion"
    implementation "io.github.microutils:kotlin-logging:$kotlinLogginVersion"
    implementation 'io.projectreactor.addons:reactor-extra' // Retryなどに必要

    // validation
    implementation 'org.hibernate.validator:hibernate-validator'
    implementation 'org.glassfish:javax.el'

    // DB
    runtimeOnly 'mysql:mysql-connector-java'
    testRuntimeOnly 'com.h2database:h2'

    // testing
    testCompile('io.projectreactor:reactor-test')
    testCompile('org.springframework.cloud:spring-cloud-stream-test-support') {
        exclude group: "junit" // Junit4を除外
    }
    testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:$kotlinMockitoVersion"
    testImplementation "org.junit.jupiter:junit-jupiter-api"
    testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
    testImplementation "com.github.tomakehurst:wiremock:$wiremockVersion"

    // documentation
    asciidoctor 'org.asciidoctor:asciidoctorj-diagram:1.5.8'
    swaggerUI("org.webjars:swagger-ui:${swaggerUiVersion}")
}

dependencyManagement {
    def springCloudVersion = 'Greenwich.RELEASE'
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
    }
}

compileKotlin {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}

compileTestKotlin {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}

test {
    useJUnitPlatform()
}

asciidoctorj {
    version = '1.5.6'
}

asciidoctor {
    requires = ['asciidoctor-diagram']
    backends = ['html5']
    sourceDir = file("docs/adoc")
    outputDir = file("${buildDir}")
    attributes = [
            'imagesdir': './images'
    ]
}

swaggerSources {
    emagazineConsumer {
        inputFile = file('docs/api/swagger.yaml')
        ui {
            outputDir = file("${buildDir}/html5/swagger")
        }
    }
}

jib {
    from {
        image = 'circleci/openjdk:8u181-jdk-stretch-node'
    }

    to {
        image = "my-registory.com/my-app:${version}"

        auth {
            username = "$System.env.DOCKER_REGISTORY_USER"
            password = "$System.env.DOCKER_REGISTORY_PASSWORD"
        }
    }

    container {
        mainClass = 'com.example.ApplicationKt'
        appRoot = "/opt/app"
        workingDirectory = "/opt/app"
        ports = ['8080']
    }
}

Router

@Component
class Router(private val handler: Handler) {
    @Bean
    fun route() = router {
        GET("/hoge", handler::hoge)
        "users/".nest {
            GET("/", handler::getUsers)
            POST("/", handler::addUser)
            PATCH("/", handler::updateUser)
        }
    }
}

Handler

Handlerは、ServerRequestを受け取りMono<ServerResponse>を返す関数を実装する。

@Component
class ApiHandler {
    fun hello(request: ServerRequest): Mono<ServerResponse> =
        request.toMono()spring
            .flatMap { ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just("hello")) }
            .onErrorResume { handleError(it) }

    private fun handleError(e: Throwable): Mono<ServerResponse> {
        return when (e) {
            is HogeException -> badRequest().body(Mono.just(e.message.orEmpty()))
            else -> badRequest().body(Mono.just("Unknown error. ${e.message}"))
        }
    }
}

XML

@Test
fun test(){
    webTestClient.
        .get()
        .url("/hoge")
        .exchange()
        .expectBody()
        .xpath("/root/header/code").isEqualTo("hoge)
}

Reactive Programming

~~なときは~~を使う

https://projectreactor.io/docs/core/milestone/reference/#which-operator

Mono

時間のかかる処理はflatMap()で。

空のMono

Mono.empty<>()で空のMonoが作れる。
空のMonoになった時点でそれ以降の処理はしない

VoidのMono

Unit.toMono()で作れる。

Mono ⇔ Flux 変換

MonoからFluxはflatMapMany()を使う。

Mono.just(listOf(1, 2, 3))
    .flatMapMany { Flux.fromIterable(it) }
    // Flux<Int> になる

FluxからMonoはcollectList()を使う。

Flux.just(1, 2, 3)
    .collectList()
    // Mono<List<Int>> になる

例外発生時の処理をしたい

単にログを出すだけとかならdoOnError()でもいい。
エラー時に別のMonoを返したいならonErrorResume()を使う。

1.toMono()
    .doOnNext{ throw Exception() }
    .onErrorResume { 
        2.toMono()
        .flatMap { reactiveProcess.someThing() } // エラー処理の中でMonoを使う場合は`onErrorResume()`でないとsubscribe()されない
    }

発生したExceptionを別のExceptionに変換する

onErrorMap()を使う。
whenRetry()が最大回数だけ失敗するとRetryExhaustedExceptionがスローされるが、原因となった例外を取り出したいときなど。

user.toMono()
    .doOnNext{ throw MyException() }
    .retry(3)
    // リトライ失敗のときはその原因となった例外に変換
    .onErrorMap { if(it is RetryExhaustedException) it.cause!! else it }

Flux

groupBy()

使用例:

Flux.just(User(name = 'aaa', age = 10), User('bbb', 10), User('ccc', 12))
    .groupBy(User::age) // 年齢ごとにグループ化
    .flatMap { group -> // 1要素はgroupedFlux(`key()`つきの`Flux<User>`)になる
        // 各グループFluxごとの処理。ここでMonoかFluxを返さないと処理が実行されないので注意。返さないならsubscribe()する。
        group.map(OptOutLog::name)
            .collectList()
            .doOnNext { doSomething(it) } // 年齢ごとの名前リストに対して任意の処理
            .map { it.toString }
    }
    .collectList() // Mono<List<User>>に変換
    .flatMap { status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON).body(it.toMono()) }

入れ子のMono

doOnNext()の中でMonoを作ってもそのMonoで書いた処理は呼ばれない。
flatMap()を使ったり、(Fluxの場合は)toMono()collectList()を呼んで処理を繋げる。

1.toMono()
    .doOnNext {
        // このMonoはsubscribe()されない
        1.toMono()
            .doOnNext{ println("hello...") }
     }
     .flatMap { noContent().build() }

1.toMono()
    .flatMap {
        // このMonoはsubscribe()される
        1.toMono()
            .doOnNext{ println("hello...") }
     }
     .flatMap { noContent().build() }

ブロッキング処理にはMono.fromCallable()を使う

DBにアクセスして取得結果を利用したいときなど。
fromCallable()の中に書いた関数の結果がnullの場合はMono.empty()が返る。

// bad
hoge.toMono()
    .map { repo.read("id") } // blocking process
    .flatMap { ok().body(Mono.just("OK")) }

// good
hoge.toMono()
    .flatMap { Mono.fromCallable { repo.read("id") } }
    .flatMap { ok().body(Mono.just("OK")) }

Mono.fromCallable { repo.read("id") }.subscribeOn(Schedulers.elastic())のほうがいいかも?

リトライ処理

https://projectreactor.io/docs/extra/release/api/reactor/retry/Retry.html

const val MIN_BACKOFF = 3
const val MAX_BACKOFF = 10
const val MAX_COUNT = 5

mono.doOnNext(::send)
    .retryWhen(Retry.anyOf<Any>(MyException::class.java)
        .exponentialBackoffWithJitter(Duration.ofSeconds(MIN_BACKOFF), Duration.ofSeconds(MAX_BACKOFF))
        .retryMax(MAX_COUNT))

WebClient

他のRESTを叩ける。

@BeanでBean定義する。

@Configuration
class WebClientConfig() {
    // `http://`は省略しないこと
    @Bean
    fun webClient(): WebClient = WebClient.create("http://localhost:3000")
}

共通ヘッダを設定する。

WebClient
    .builder()
    .defaultHeader("key", "hoge")
    .build()

使う。

client.get()
    .uri("/users/{id}", 1)
    .exchange()
    .subscribe()

Jackson

Enumのデシリアライズ

@JsonFormatアノテーションを付けないと、列挙子(VAL1...)がjsonのvalueに出力されてしまう。
ref: https://www.baeldung.com/jackson-serialize-enums

@JsonFormat(shape = JsonFormat.Shape.Number)
enum class Hoge(val value: Int) {
    VAL1(1),
    VAL2(2),
    VAL3(3)
}

Jpa Repository

application.yml

spring:
  jpa:
    hibernate:
      ddl-auto: create
  datasource:
    url: jdbc:mysql://localhost:3306/my_database
    username: root
    password: password

User.kt

@Table // テーブル名はデフォルトでクラス名と同じ(user)で紐づく
@Entity
class User {
    @Id
    var id: Int = 0
    @Column
    var name: String = ""
    @Column
    var age: Int = 0
    @Column
    var version: Int = 0
}

UserRepository.kt

@Repository
interface UserRepository : JpaRepository<User, Int>

init.sql

create database my_database;

use my_database;

create table my_table  (
    id int primary key,
    name varchar(50) not null,
    age int not null,
    version int not null
) charset=utf8;

Spring

const valな定数にapplication.ymlの設定値を設定する

アノテーションのパラメータに設定値を入れたいときに便利。

private const val HOGE = "\${aaa.bbb.ccc}"

複数DB接続

http://roufid.com/spring-boot-multiple-databases-configuration/

functional programming

Either

leftに例外、rightに結果を入れる。

Test

非同期なのでassertionの時点で処理が完了しているかどうかは確認しておく。

Mockitokotlin

関数のMock。

val hogeService = mock<HogeService> {
    // デフォルト値がある関数の場合も最後の引数まで省略しないで書く
    on{ hello(eq("aaa"), any())} doReturn 123
    // 引数を利用して戻り値を返す場合は`doAnswer()`
    on { hello(any()) } doAnswer { (it.arguments[0] as Hoge) } // もらった引数の0番目をそのまま返す
}

呼ばれたかの検証はverify()で。

// `hello()`が1回呼ばれたか
verify(hogeService, times(1)).hello(any())

// `hello("hoge")`が1回呼ばれたか
verify(hogeService, times(1)).hello(eq("hoge"))

// `hello()`が1回呼ばれたか検証し、さらに呼ばれたときの引数を`actual`に代入
val actual = argumentCaptor<String>().apply { verify(hogeService, times(1)).hello(capture()) }.firstValue
Assertions.assertEquals("hoge", actual)

Monoを返すMock

val hogeService = mock<HogeService> {
    // 例外を起こす時は`doThrow`で例外をスローするのではなく`Mono.error()`を返すようにする。
    on{ hello(eq("aaa"), any())} doReturn Mono.error(EntityNotFoundException())
}

WebTestClient

REST APIのテスト。

@Autowired
private lateinit var webTestClient: WebTestClient

fun test() {
    webTestClient.get().uri("/hello").exchange().expectStatus().isOk
                .expectBody()
                .jsonPath("\$.results[0].name").isEqualTo("aaa")
                .jsonPath("\$.results[0].age").isEqualTo(12)
}

application.yml

パッケージごとにログレベルを設定できる。

logging:
 level:
   # com.exampleがDEBUG、他はすべてWARN
   ROOT: WARN
   com.example: DEBUG

application/json+stream 形式のレスポンスを返す

無限ストリームの例。

data class Hoge(val a: Int)

fun hello(request: ServerRequest): Mono<ServerResponse> {
        val stream = Stream.iterate(0) {it + 1}
        val flux = Flux.fromStream(stream)
            .zipWith(Flux.interval(Duration.ofSeconds(1)))
            .map { Hoge(it.t1) }
            .doOnNext { println(it) }

        return request.toMono()
            .flatMap { ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(flux, Hoge::class.java) }
    }

Flux.create()を使うと好きな条件を使ってストリームを終了できる。

data class Hoge(val a: Int)

fun hello(request: ServerRequest): Mono<ServerResponse> {
        val s = Flux.create<Hoge> { sink ->
            var i = 0
            while(true){
                sink.next(Hoge(i))
                Thread.sleep(1000L)
                i++
                if(5 < i) break
            }
            sink.complete()
        }

        return request.toMono()
            .flatMap { ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(s, Hoge::class.java) }
    }

MySQLと一緒とに使うときの注意点

Mono.fromCallableを使う。 以下のようにすることで、ブロッキング処理のみ指定したスレッド数で処理するようにできる。

@Repository
interface HogeRepository : JpaRepository<Hoge, UUID>

val repositoryParallel: Scheduler = Schedulers.newParallel("repository", 10)
fun <T> parallel(f: () -> T): Mono<T> = Mono.fromCallable(f)
  .subscribeOn(repositoryParallel)
  .publishOn(Schedulers.elastic())
1.toMono()
  .flatMap { parallel { hogeRepo.findById("id") } }
spring:
  datasource:
    url: "jdbc:mysql://localhost:3306/db"
    username: "root"
    password: "password"
    hikari:
      connection-test-query: "SELECT 1"
      maximum-pool-size: 10 # ここの数とスレッド数を合わせる。
      minimum-idle: 1

Tips

Fluxが空になったら例外処理をする

switchIfEmptyを使う。

Flux.just(1, 2, 3)
  .filter{ it -> it < 4 }
  .switchIfEmpty { Mono.error(RuntimeException()) }

条件に合うまで繰り返し生成する

expandを使う。
リクエストにlimitがあるAPIを、指定のデータ数が取得できるまで叩いて、そのリストをFluxとして扱う、など。

data class Request(val count: Int, val offset: Int)

Request(count = 100, offset = 0)
    .toMono()
    .expand { (req, totalCount) ->
        if(totalCount < req.count){
            Mono.empty()
        } else {
            val res = api.users(limit = 10, offset = req.offset)
            Mono.just(Pair(req, totalCount + res.count))
        }
    }

Ref:
https://qiita.com/mori-bito/items/7f66769c71ffc6a9c1e6

https://stackoverflow.com/questions/54168302/repeat-request-until-condition-is-met-in-webflux-based-on-last-responses-value

WebClientに共通の認証を埋め込む

ExchangeFilterFunctionを使う

Ref:
(Spring 5 WebClient and WebTestClient Tutorial with Examples)[https://www.callicoder.com/spring-5-reactive-webclient-webtestclient-examples/#adding-filter-functions]

アクセスログを表示する

resources/logback-spring.xmlを作って、
application.yamlで作ったファイルを指定する。

logging.config: config/logback-spring.xml

アクセスログを有効にするために起動時のシステムプロパティを設定する。

bootRun {
    jvmArgs = ["-Dreactor.netty.http.server.accessLogEnabled=true"]
}

アプリ起動。
アプリにHTTPリクエストを投げるとlogs/access.logに追記される。

./gradlew bootRun

Ref:
https://cloud.spring.io/spring-cloud-static/spring-cloud-gateway/2.1.0.RELEASE/multi/multi__reactor_netty_access_logs.html