Spring Boot / Kafka
単体テスト (Junit5)
build.gradleに追記。
testImplementation "org.springframework.kafka:spring-kafka-test"
テストコード。
// 9092ポートだとローカルで起動してるものと被っていやなので9992にしている。
// ブローカのポートは指定しないとランダムになるっぽい?
@EmbeddedKafka(topics = ["testTopic"], brokerProperties = ["listeners=PLAINTEXT://localhost:9992"])
@ActiveProfiles("unit")
class KafkaTest {
@Autowired
private lateinit var template: KafkaTemplate<String, String>
}
// KafkaTemplateの設定
@Configuration
@EnableKafka
class Config {
@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private lateinit var brokerAddresses: String
@Bean
fun kpf(): ProducerFactory<String, String> {
val configs = HashMap<String, Any>()
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configs)
}
@Bean
fun kt(): KafkaTemplate<String, String> {
return KafkaTemplate(kpf())
}
}
Ref: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt https://github.com/spring-projects/spring-kafka/blob/ee240006b3ddc2117ac5205a52f80000e094875d/src/reference/asciidoc/testing.adoc
application-unit.yml
spring:
kafka:
# ここが大事。組み込みKafkaを指定している。
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: COUNT_TIME
ack-count: 100
ack-time: 10000
Ref: @EmbeddedKafkaについて
- https://github.com/spring-projects/spring-kafka/blob/ee240006b3ddc2117ac5205a52f80000e094875d/src/reference/asciidoc/testing.adoc#embeddedkafka-annotation
- Testing an Apache Kafka Integration within a Spring Boot Application
@SpringBootTest
@SpringJUnitConfig
@EmbeddedKafka(topics = ["testTopic"])
@ActiveProfiles("unit")
class MyTest {
// 内側に@Configurationがあると@Autowiredがうまく動かない
@Configration
class Config{
@Bean
fun a() {}
}
}
Spring+WebFluxでKafkaのメッセージをMySQLに保存する
VSCode拡張を入れる
- Java Extension Pack
- Gradle Language Support
- kotlin
- Spring Boot Extension Pack
- Project - Gradle Project
- Language - Kotlin
- Spring Boot - 2.2.0(SNAPSHOT)
- Dependencies - MySQL driver, Spring for Apache Kafka
javac
が入っていない場合はJDKをインストールする。
apt install -y default-jdk
gradle init