Spring WebFlux
Project Reactor docs:
https://projectreactor.io/docs/core/release/reference/
Spring Webflux:
https://docs.spring.io/spring-framework/docs/5.1.2.RELEASE/spring-framework-reference/web-reactive.html
Kotlin Webflux:
https://www.kotlindevelopment.com/kotlin-webflux/
https://kotlinexpertise.com/spring-webflux-with-kotlin-reactive-web/
https://backpaper0.github.io/ghosts/reactive/#1
サンプル
ソース
git clone git@github.com:hako1912/hello-spring-webflux.git
Dockerイメージ
docker pull hako1912/hello-spring-webflux
build.gradle
build.gradle
のひな型。
plugins {
id 'org.springframework.boot' version '2.1.3.RELEASE'
id 'io.spring.dependency-management' version '1.0.6.RELEASE'
id 'org.jetbrains.kotlin.jvm' version '1.3.21'
id 'org.jetbrains.kotlin.plugin.spring' version "1.3.21"
id "org.hidetake.swagger.generator" version "2.14.0"
id 'org.asciidoctor.convert' version "1.5.7"
id 'com.google.cloud.tools.jib' version '1.0.2'
id 'org.jlleitschuh.gradle.ktlint' version '7.1.0'
}
group = 'com.example'
version = rootProject.file('.version').text.trim()
repositories {
jcenter()
mavenCentral()
}
// spring-boot-dependenciesのjunitのバージョンを上書きする
ext['junit-jupiter.version'] = '5.4.0'
dependencies {
def kotlinLogginVersion = '1.6.25'
def kotlinArrowVersion = '0.7.2'
def kotlinMockitoVersion = "2.1.0"
def wiremockVersion = '2.19.0'
def swaggerUiVersion = '3.10.0'
// spring
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
// kotlin
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'
implementation 'org.jetbrains.kotlin:kotlin-reflect'
implementation "io.arrow-kt:arrow-core:$kotlinArrowVersion"
implementation "io.github.microutils:kotlin-logging:$kotlinLogginVersion"
implementation 'io.projectreactor.addons:reactor-extra' // Retryなどに必要
// validation
implementation 'org.hibernate.validator:hibernate-validator'
implementation 'org.glassfish:javax.el'
// DB
runtimeOnly 'mysql:mysql-connector-java'
testRuntimeOnly 'com.h2database:h2'
// testing
testCompile('io.projectreactor:reactor-test')
testCompile('org.springframework.cloud:spring-cloud-stream-test-support') {
exclude group: "junit" // Junit4を除外
}
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:$kotlinMockitoVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
testImplementation "com.github.tomakehurst:wiremock:$wiremockVersion"
// documentation
asciidoctor 'org.asciidoctor:asciidoctorj-diagram:1.5.8'
swaggerUI("org.webjars:swagger-ui:${swaggerUiVersion}")
}
dependencyManagement {
def springCloudVersion = 'Greenwich.RELEASE'
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
}
}
compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
}
}
compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
}
}
test {
useJUnitPlatform()
}
asciidoctorj {
version = '1.5.6'
}
asciidoctor {
requires = ['asciidoctor-diagram']
backends = ['html5']
sourceDir = file("docs/adoc")
outputDir = file("${buildDir}")
attributes = [
'imagesdir': './images'
]
}
swaggerSources {
emagazineConsumer {
inputFile = file('docs/api/swagger.yaml')
ui {
outputDir = file("${buildDir}/html5/swagger")
}
}
}
jib {
from {
image = 'circleci/openjdk:8u181-jdk-stretch-node'
}
to {
image = "my-registory.com/my-app:${version}"
auth {
username = "$System.env.DOCKER_REGISTORY_USER"
password = "$System.env.DOCKER_REGISTORY_PASSWORD"
}
}
container {
mainClass = 'com.example.ApplicationKt'
appRoot = "/opt/app"
workingDirectory = "/opt/app"
ports = ['8080']
}
}
Router
@Component
class Router(private val handler: Handler) {
@Bean
fun route() = router {
GET("/hoge", handler::hoge)
"users/".nest {
GET("/", handler::getUsers)
POST("/", handler::addUser)
PATCH("/", handler::updateUser)
}
}
}
Handler
Handlerは、ServerRequest
を受け取りMono<ServerResponse>
を返す関数を実装する。
@Component
class ApiHandler {
fun hello(request: ServerRequest): Mono<ServerResponse> =
request.toMono()spring
.flatMap { ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just("hello")) }
.onErrorResume { handleError(it) }
private fun handleError(e: Throwable): Mono<ServerResponse> {
return when (e) {
is HogeException -> badRequest().body(Mono.just(e.message.orEmpty()))
else -> badRequest().body(Mono.just("Unknown error. ${e.message}"))
}
}
}
XML
@Test
fun test(){
webTestClient.
.get()
.url("/hoge")
.exchange()
.expectBody()
.xpath("/root/header/code").isEqualTo("hoge)
}
Reactive Programming
~~なときは~~を使う
https://projectreactor.io/docs/core/milestone/reference/#which-operator
Mono
時間のかかる処理はflatMap()
で。
空のMono
Mono.empty<>()
で空のMonoが作れる。
空のMonoになった時点でそれ以降の処理はしない
VoidのMono
Unit.toMono()
で作れる。
Mono ⇔ Flux 変換
MonoからFluxはflatMapMany()
を使う。
Mono.just(listOf(1, 2, 3))
.flatMapMany { Flux.fromIterable(it) }
// Flux<Int> になる
FluxからMonoはcollectList()
を使う。
Flux.just(1, 2, 3)
.collectList()
// Mono<List<Int>> になる
例外発生時の処理をしたい
単にログを出すだけとかならdoOnError()
でもいい。
エラー時に別のMonoを返したいならonErrorResume()
を使う。
1.toMono()
.doOnNext{ throw Exception() }
.onErrorResume {
2.toMono()
.flatMap { reactiveProcess.someThing() } // エラー処理の中でMonoを使う場合は`onErrorResume()`でないとsubscribe()されない
}
発生したExceptionを別のExceptionに変換する
onErrorMap()
を使う。
whenRetry()
が最大回数だけ失敗するとRetryExhaustedException
がスローされるが、原因となった例外を取り出したいときなど。
user.toMono()
.doOnNext{ throw MyException() }
.retry(3)
// リトライ失敗のときはその原因となった例外に変換
.onErrorMap { if(it is RetryExhaustedException) it.cause!! else it }
Flux
groupBy()
使用例:
Flux.just(User(name = 'aaa', age = 10), User('bbb', 10), User('ccc', 12))
.groupBy(User::age) // 年齢ごとにグループ化
.flatMap { group -> // 1要素はgroupedFlux(`key()`つきの`Flux<User>`)になる
// 各グループFluxごとの処理。ここでMonoかFluxを返さないと処理が実行されないので注意。返さないならsubscribe()する。
group.map(OptOutLog::name)
.collectList()
.doOnNext { doSomething(it) } // 年齢ごとの名前リストに対して任意の処理
.map { it.toString }
}
.collectList() // Mono<List<User>>に変換
.flatMap { status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON).body(it.toMono()) }
Mono
入れ子のdoOnNext()
の中でMono
を作ってもそのMono
で書いた処理は呼ばれない。
flatMap()
を使ったり、(Flux
の場合は)toMono()
やcollectList()
を呼んで処理を繋げる。
1.toMono()
.doOnNext {
// このMonoはsubscribe()されない
1.toMono()
.doOnNext{ println("hello...") }
}
.flatMap { noContent().build() }
1.toMono()
.flatMap {
// このMonoはsubscribe()される
1.toMono()
.doOnNext{ println("hello...") }
}
.flatMap { noContent().build() }
Mono.fromCallable()
を使う
ブロッキング処理にはDBにアクセスして取得結果を利用したいときなど。
fromCallable()
の中に書いた関数の結果がnullの場合はMono.empty()
が返る。
// bad
hoge.toMono()
.map { repo.read("id") } // blocking process
.flatMap { ok().body(Mono.just("OK")) }
// good
hoge.toMono()
.flatMap { Mono.fromCallable { repo.read("id") } }
.flatMap { ok().body(Mono.just("OK")) }
Mono.fromCallable { repo.read("id") }.subscribeOn(Schedulers.elastic())
のほうがいいかも?
リトライ処理
https://projectreactor.io/docs/extra/release/api/reactor/retry/Retry.html
const val MIN_BACKOFF = 3
const val MAX_BACKOFF = 10
const val MAX_COUNT = 5
mono.doOnNext(::send)
.retryWhen(Retry.anyOf<Any>(MyException::class.java)
.exponentialBackoffWithJitter(Duration.ofSeconds(MIN_BACKOFF), Duration.ofSeconds(MAX_BACKOFF))
.retryMax(MAX_COUNT))
WebClient
他のRESTを叩ける。
@Bean
でBean定義する。
@Configuration
class WebClientConfig() {
// `http://`は省略しないこと
@Bean
fun webClient(): WebClient = WebClient.create("http://localhost:3000")
}
共通ヘッダを設定する。
WebClient
.builder()
.defaultHeader("key", "hoge")
.build()
使う。
client.get()
.uri("/users/{id}", 1)
.exchange()
.subscribe()
Jackson
Enumのデシリアライズ
@JsonFormat
アノテーションを付けないと、列挙子(VAL1...)がjsonのvalueに出力されてしまう。
ref: https://www.baeldung.com/jackson-serialize-enums
@JsonFormat(shape = JsonFormat.Shape.Number)
enum class Hoge(val value: Int) {
VAL1(1),
VAL2(2),
VAL3(3)
}
Jpa Repository
application.yml
spring:
jpa:
hibernate:
ddl-auto: create
datasource:
url: jdbc:mysql://localhost:3306/my_database
username: root
password: password
User.kt
@Table // テーブル名はデフォルトでクラス名と同じ(user)で紐づく
@Entity
class User {
@Id
var id: Int = 0
@Column
var name: String = ""
@Column
var age: Int = 0
@Column
var version: Int = 0
}
UserRepository.kt
@Repository
interface UserRepository : JpaRepository<User, Int>
init.sql
create database my_database;
use my_database;
create table my_table (
id int primary key,
name varchar(50) not null,
age int not null,
version int not null
) charset=utf8;
Spring
const val
な定数にapplication.ymlの設定値を設定する
アノテーションのパラメータに設定値を入れたいときに便利。
private const val HOGE = "\${aaa.bbb.ccc}"
複数DB接続
http://roufid.com/spring-boot-multiple-databases-configuration/
functional programming
Either
leftに例外、rightに結果を入れる。
Test
非同期なのでassertionの時点で処理が完了しているかどうかは確認しておく。
Mockitokotlin
関数のMock。
val hogeService = mock<HogeService> {
// デフォルト値がある関数の場合も最後の引数まで省略しないで書く
on{ hello(eq("aaa"), any())} doReturn 123
// 引数を利用して戻り値を返す場合は`doAnswer()`
on { hello(any()) } doAnswer { (it.arguments[0] as Hoge) } // もらった引数の0番目をそのまま返す
}
呼ばれたかの検証はverify()
で。
// `hello()`が1回呼ばれたか
verify(hogeService, times(1)).hello(any())
// `hello("hoge")`が1回呼ばれたか
verify(hogeService, times(1)).hello(eq("hoge"))
// `hello()`が1回呼ばれたか検証し、さらに呼ばれたときの引数を`actual`に代入
val actual = argumentCaptor<String>().apply { verify(hogeService, times(1)).hello(capture()) }.firstValue
Assertions.assertEquals("hoge", actual)
Monoを返すMock
val hogeService = mock<HogeService> {
// 例外を起こす時は`doThrow`で例外をスローするのではなく`Mono.error()`を返すようにする。
on{ hello(eq("aaa"), any())} doReturn Mono.error(EntityNotFoundException())
}
WebTestClient
REST APIのテスト。
@Autowired
private lateinit var webTestClient: WebTestClient
fun test() {
webTestClient.get().uri("/hello").exchange().expectStatus().isOk
.expectBody()
.jsonPath("\$.results[0].name").isEqualTo("aaa")
.jsonPath("\$.results[0].age").isEqualTo(12)
}
application.yml
パッケージごとにログレベルを設定できる。
logging:
level:
# com.exampleがDEBUG、他はすべてWARN
ROOT: WARN
com.example: DEBUG
application/json+stream 形式のレスポンスを返す
無限ストリームの例。
data class Hoge(val a: Int)
fun hello(request: ServerRequest): Mono<ServerResponse> {
val stream = Stream.iterate(0) {it + 1}
val flux = Flux.fromStream(stream)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map { Hoge(it.t1) }
.doOnNext { println(it) }
return request.toMono()
.flatMap { ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(flux, Hoge::class.java) }
}
Flux.create()
を使うと好きな条件を使ってストリームを終了できる。
data class Hoge(val a: Int)
fun hello(request: ServerRequest): Mono<ServerResponse> {
val s = Flux.create<Hoge> { sink ->
var i = 0
while(true){
sink.next(Hoge(i))
Thread.sleep(1000L)
i++
if(5 < i) break
}
sink.complete()
}
return request.toMono()
.flatMap { ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(s, Hoge::class.java) }
}
MySQLと一緒とに使うときの注意点
Mono.fromCallable
を使う。
以下のようにすることで、ブロッキング処理のみ指定したスレッド数で処理するようにできる。
@Repository
interface HogeRepository : JpaRepository<Hoge, UUID>
val repositoryParallel: Scheduler = Schedulers.newParallel("repository", 10)
fun <T> parallel(f: () -> T): Mono<T> = Mono.fromCallable(f)
.subscribeOn(repositoryParallel)
.publishOn(Schedulers.elastic())
1.toMono()
.flatMap { parallel { hogeRepo.findById("id") } }
spring:
datasource:
url: "jdbc:mysql://localhost:3306/db"
username: "root"
password: "password"
hikari:
connection-test-query: "SELECT 1"
maximum-pool-size: 10 # ここの数とスレッド数を合わせる。
minimum-idle: 1
Tips
Fluxが空になったら例外処理をする
switchIfEmpty
を使う。
Flux.just(1, 2, 3)
.filter{ it -> it < 4 }
.switchIfEmpty { Mono.error(RuntimeException()) }
条件に合うまで繰り返し生成する
expand
を使う。
リクエストにlimitがあるAPIを、指定のデータ数が取得できるまで叩いて、そのリストをFluxとして扱う、など。
data class Request(val count: Int, val offset: Int)
Request(count = 100, offset = 0)
.toMono()
.expand { (req, totalCount) ->
if(totalCount < req.count){
Mono.empty()
} else {
val res = api.users(limit = 10, offset = req.offset)
Mono.just(Pair(req, totalCount + res.count))
}
}
Ref:
https://qiita.com/mori-bito/items/7f66769c71ffc6a9c1e6
WebClientに共通の認証を埋め込む
ExchangeFilterFunction
を使う
Ref:
(Spring 5 WebClient and WebTestClient Tutorial with Examples)[https://www.callicoder.com/spring-5-reactive-webclient-webtestclient-examples/#adding-filter-functions]
アクセスログを表示する
resources/logback-spring.xml
を作って、
application.yaml
で作ったファイルを指定する。
logging.config: config/logback-spring.xml
アクセスログを有効にするために起動時のシステムプロパティを設定する。
bootRun {
jvmArgs = ["-Dreactor.netty.http.server.accessLogEnabled=true"]
}
アプリ起動。
アプリにHTTPリクエストを投げるとlogs/access.log
に追記される。
./gradlew bootRun