Kafka

ref: Apache Kafka Apache Kafkaに入門した

本: https://www.oreilly.co.jp/books/9784873118499/

目次:

導入

Dockerで構築

HOST_IP=$(ipconfig getifaddr en1)
docker run --rm -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=$HOST_IP --env ADVERTISED_PORT=9092 spotify/kafka

CLI(mac)

brew install kafka

CLI(Ubuntu18.04)

~/toolsの下にインストールする例

(mkdir -p ~/tools \
    && cd ~/tools \
    && curl -LO https://www-us.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz \
    && tar -xzf kafka_2.11-2.1.0.tgz \
    && rm kafka_2.11-2.1.0.tgz)

基本

Consumer

トピックに発行される各レコードは、各コンシューマグループ内の1つのインスタンスにロードバランスされて配信される。 各コンシューマのグループIDが異なる場合はブロードキャストされる。

Ref: https://kafka.apache.org/documentation/#intro_consumers

Kafka console scripts

ref: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz Kafka | Quickstart

次のコマンドで/c/tools/kafka_2.11-2.1.0/にダウンロードする。

(cd ~/downloads \
    && curl -LO http://ftp.jaist.ac.jp/pub/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz \
    && tar -xzf kafka_2.11-2.1.0.tgz \
    && cp -R kafka_2.11-2.1.0/  /c/tools/kafka_2.11-2.1.0/)

Windowsの場合はkafka_2.11-2.1.0/bin/windows配下にパスを通し、.shの代わりに.batを使う。

トピック作成

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

トピック削除

ref: Manually delete Apache Kafka topics

server.propertiesに以下の1行を追記する。

delete.topic.enable=true
kafka-topics.sh --zookeeper localhost:2181 --delete --topic [トピック名]

メッセージ送信

kafka-console-producer.sh --broker-list localhost:9092 --topic [トピック名]

# jsonファイルを送信
cat hoge.json | jq -c . | kafka-console-producer.sh --broker-list localhost:9092 --topic [トピック名]

# ファイルを1行ごと送信
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

Ref:

jsonを送信するときは1行に整形してから送信する。

メッセージ受信

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [トピック名] --from-beginning

# メッセージをファイルに保存
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [トピック名] --from-beginning > hoge.txt

メッセージ削除

# retentionを1秒にする
kafka-topics.sh --zookeeper <zookeeper host>:2181 --alter --topic <topic name> --config retention.ms=1000

# メッセージが消えるまで待つ。。。(1分くらい

# retentionをもとに戻す
kafka-configs.sh --zookeeper <zookeeper host>:2181 --entity-type topics --alter --entity-name <topic name> --delete-config retention.ms

グループIDの一覧を取得

グループに紐づいているトピックを確認できる。

kafka-consumer-groups.sh --bootstrap-server <kafka-host>:9092 --group <group-id> --describe

# TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
# my-topic 0          269             269             0               consumer-host /100.73.103.226 my-consumer-0

Spring Bootで使う

ref: Intro to Apache Kafka with Spring tonsV2/spring-kotlin-kafka

build.gradle

build.gradleに依存関係を追加する。

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
// Kotlinのdata classをjsonに変換するために必要
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'

application.yml

kafka:
  bootstrapAddress: localhost:9092
message:
  topic:
    name: test-topic
greeting:
  topic:
    name: greeting

Consumerの設定

const val GROUP_ID = "test-group"

@Configuration
@EnableKafka
class KafkaConsumerConfig(
    @Value("\${spring.kafka.bootstrapAddress}")
    private val bootstrapAddress: String = "localhost:9092"
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val props = HashMap<String, Any?>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        props[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        return DefaultKafkaConsumerFactory(props)
    }

    // 文字列のメッセージを受信するコンシューマファクトリ
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        return factory
    }

    fun greetingConsumerFactory(): ConsumerFactory<String, Greeting> {
        val props = HashMap<String, Any?>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        props[ConsumerConfig.GROUP_ID_CONFIG] = "greeting"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
        props[JsonDeserializer.VALUE_DEFAULT_TYPE] = Greeting::class.java
        return DefaultKafkaConsumerFactory<String, Greeting>(
          props,
          StringDeserializer(),
          // KotlinModule()を登録しないとdata classが正常に変換できない
          JsonDeserializer(Greeting::class.java, ObjectMapper().registerModule(KotlinModule())))
    }

    // jsonのメッセージを受信するコンシューマファクトリ
    @Bean
    fun greetingKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Greeting> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Greeting>()
        factory.consumerFactory = greetingConsumerFactory()
        return factory
    }
}

Consumer

const val topicString = "topic-string"
const val topicJson = "topic-json"

@EnableKafka
@Configuration
class KafkaConsumer(private val handler: Handler) {

    // 文字列のメッセージを受信する
    @KafkaListener(topics = [topicString], groupId = "foo")
    fun listen(message: String) {
        message.toMono()
            .doOnNext { println("Received String in group foo: $message") }
            .subscribe()
    }

    // jsonのメッセージを受信する
    @KafkaListener(topics = [topicJson], groupId = "foo", containerFactory = "greetingKafkaListenerContainerFactory")
    fun listen(message: Greeting) {
        message.toMono()
            .doOnNext { println("Received Json in group foo: ${message.msg}") }
            .subscribe()
    }
}

Producerの設定

@Configuration
class KafkaProducerConfig(
    @Value("\${kafka.host:localhost}")
    private val bootstrapAddress: String = "localhost:9092"
) {

    @Bean
    fun producerFactory(): ProducerFactory<String, String> {
        val configProps = HashMap<String, Any?>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun greetingProducerFactory(): ProducerFactory<String, Greeting> {
        val configProps = HashMap<String, Any?>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java

        // ヘッダを消さないと"Magic v1 does not support record headers"とエラーがでる
        configProps[JsonSerializer.ADD_TYPE_INFO_HEADERS] = false

        return DefaultKafkaProducerFactory(configProps)
    }

    // 文字列のメッセージを送信するプロデューサ
    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

    // jsonのメッセージを送信するプロデューサ
    @Bean
    fun greetingKafkaTemplate(): KafkaTemplate<String, Greeting> {
        return KafkaTemplate<String, Greeting>(greetingProducerFactory())
    }
}

// 送受信するjsonのDTO
data class Greeting(val msg: String, val name: String)

Producer

@Service
class KafkaProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>,
    private val greetingKafkaTemplate: KafkaTemplate<String, Greeting>
) {
    fun send(msg: String) = kafkaTemplate.send(topicString, msg)

    fun send(msg: Greeting) = greetingKafkaTemplate.send(topicJson, msg)
}

Topicの設定

KafkaAdminを利用するとアプリ起動時に勝手にトピックを作成してくれる。

@Configuration
class KafkaTopicConfig {

    @Value(value = "\${kafka.bootstrapAddress}")
    private val bootstrapAddress: String = "localhost:9092"

    @Value(value = "\${message.topic.name}")
    private val topicName: String? = null

    @Bean
    fun kafkaAdmin(): KafkaAdmin {
        val configs = HashMap<String, Any>()
        configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        return KafkaAdmin(configs)
    }

    @Bean
    fun topic1(): NewTopic {
        return NewTopic(topicName, 1, 1.toShort())
    }
}

Tips

グループごとにオフセットをリセットする

グループidを変えなくても再consumeしたいときなどに。

SERVERS="kaf1:9092,kaf2:9092"
GROUP="my-group"
TOPIC="my-topic"

## 確認
kafka-consumer-groups.sh --bootstrap-server $SERVERS --group $GROUP --describe

## オフセットをリセット
kafka-consumer-groups.sh --bootstrap-server $SERVERS --group $GROUP --topic $TOPIC --reset-offsets --to-earliest --execute

Ref: https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

Client library (Ruby)

ruby-kafka

インストール

FAQ

alpineで使うとError loading shared library ld-linux-x86-64.so.2: No such file or directory

libc6-compatが必要。

apk add libc6-compat

InstanceAlreadyExistsException

clientIdが重複していると起こる。@KafkaListenerが複数ある場合はclientIdPrefixをそれぞれ別の値に設定することで解消する。

    @KafkaListener(topics = ["my-topic-1"], groupId = "my-group", clientIdPrefix = "my-prefix-1")
    fun listen1(messageJson: String) {
        // ...
    }

    @KafkaListener(topics = ["my-topic-2"], groupId = "my-group", clientIdPrefix = "my-prefix-2")
    fun listen2(messageJson: String) {
        // ...
    }

Magic v1 does not support record headers

ref: Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)

JsonSerializerを使って値を送信するときに発生する。 JsonSerializerを使うとデフォルトでヘッダ(JsonSerde)が追加されるが、このエラーを解消するためにはヘッダの追加を無効化する必要がある。 以下のようにJsonSerializer.ADD_TYPE_INFO_HEADERSfalseを設定する。

@Bean
    fun greetingProducerFactory(): ProducerFactory<String, Greeting> {
        val configProps = HashMap<String, Any?>()
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)

        // この行を追加する
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

        return DefaultKafkaProducerFactory(configProps)
    }

コンシューマでdata classのデシリアライズに失敗する

ref: https://stackoverflow.com/a/50950955

JsonDeserializerにKotlinModule()を登録したデシリアライザを設定すると解決する。

    fun greetingConsumerFactory(): ConsumerFactory<String, Greeting> {
        val props = HashMap<String, Any?>()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting")

        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Greeting::class.java)

        return DefaultKafkaConsumerFactory<String, Greeting>(
          props,
          StringDeserializer(),

          // JsonDeserializer(Greeting::class.java)
          // ↑の代わりにこれを追加
          JsonDeserializer(Greeting::class.java, ObjectMapper().registerModule(KotlinModule())))
    }

Kubernetes

yamlファイル作成

ココを参考にする。 https://github.com/kow3ns/kubernetes-kafka/tree/master/manifests

Unable to connect to zk-cs.default.svc.cluster.local:2181のようなエラーがでたときは、defaultをnamespace名に置き換える。

kafka_micro.yaml

---
apiVersion: v1
kind: Service
metadata:
    name: kafka-hs
    labels:
        app: kafka
spec:
    ports:
    - port: 9093
      name: server
    clusterIP: None
    selector:
        app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
    name: kafka-pdb
spec:
    selector:
        matchLabels:
            app: kafka
    maxUnavailable: 1
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
    name: kafka
spec:
    serviceName: kafka-hs
    replicas: 1
    podManagementPolicy: Parallel
    updateStrategy:
        type: RollingUpdate
    template:
        metadata:
            labels:
                app: kafka
        spec:
            affinity:
                podAntiAffinity:
                    requiredDuringSchedulingIgnoredDuringExecution:
                    - labelSelector:
                          matchExpressions:
                          - key: "app"
                            operator: In
                            values:
                            - kafka
                      topologyKey: "kubernetes.io/hostname"
                podAffinity:
                    preferredDuringSchedulingIgnoredDuringExecution:
                    - weight: 1
                      podAffinityTerm:
                          labelSelector:
                              matchExpressions:
                              - key: "app"
                                operator: In
                                values:
                                - zk
                          topologyKey: "kubernetes.io/hostname"
            terminationGracePeriodSeconds: 300
            containers:
            - name: k8skafka
              imagePullPolicy: Always
              image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
              resources:
                  requests:
                      memory: "1Gi"
                      cpu: "0.5"
              ports:
              - containerPort: 9093
                name: server
              # zookeeper.connectに設定するドメイン名`my-kafka`は、namespace名に置き換えること
              command:
              - sh
              - -c
              - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zk-cs.my-kafka.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
              env:
              - name: KAFKA_HEAP_OPTS
                value : "-Xmx512M -Xms512M"
              - name: KAFKA_OPTS
                value: "-Dlogging.level=INFO"
              readinessProbe:
                  exec:
                      command:
                      - sh
                      - -c
                      - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
            securityContext:
                runAsUser: 1000
                fsGroup: 1000

zookeeper_micro.yaml

---
apiVersion: v1
kind: Service
metadata:
    name: zk-hs
    labels:
        app: zk
spec:
    ports:
    - port: 2888
      name: server
    - port: 3888
      name: leader-election
    clusterIP: None
    selector:
        app: zk
---
apiVersion: v1
kind: Service
metadata:
    name: zk-cs
    labels:
        app: zk
spec:
    ports:
    - port: 2181
      name: client
    selector:
        app: zk
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
    name: zk
spec:
    serviceName: zk-hs
    replicas: 1
    podManagementPolicy: Parallel
    updateStrategy:
        type: RollingUpdate
    template:
        metadata:
            labels:
                app: zk
        spec:
            affinity:
                podAntiAffinity:
                    requiredDuringSchedulingIgnoredDuringExecution:
                    - labelSelector:
                          matchExpressions:
                          - key: "app"
                            operator: In
                            values:
                            - zk
                      topologyKey: "kubernetes.io/hostname"
            containers:
            - name: kubernetes-zookeeper
              imagePullPolicy: Always
              image: "gcr.io/google_containers/kubernetes-zookeeper:1.0-3.4.10"
              resources:
                  requests:
                      memory: "1Gi"
                      cpu: "0.5"
              ports:
              - containerPort: 2181
                name: client
              - containerPort: 2888
                name: server
              - containerPort: 3888
                name: leader-election
              command:
              - sh
              - -c
              - "start-zookeeper \
          --servers=1 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
              readinessProbe:
                  exec:
                      command:
                      - sh
                      - -c
                      - "zookeeper-ready 2181"
                  initialDelaySeconds: 10
                  timeoutSeconds: 5
              livenessProbe:
                  exec:
                      command:
                      - sh
                      - -c
                      - "zookeeper-ready 2181"
                  initialDelaySeconds: 10
                  timeoutSeconds: 5
            securityContext:
                runAsUser: 1000
                fsGroup: 1000

デプロイと動作確認

# namespaceの名前は`kafka_micro.yaml`の設定値と合わせること
kubectl create namespace my-kafka

# デプロイ
kubectl create -f zookeeper_micro.yaml
kubectl create -f kafka_micro.yaml

# トピック作成
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 createtopic --restart=Never --rm -- kafka-topics.sh --create \
--topic test \
--zookeeper zk-cs.my-kafka.svc.cluster.local:2181 \
--partitions 1 \
--replication-factor 1

# トピック削除
# コンシューマを止めておくこと
kafka-topics.sh --delete --zookeeper localhost:2181 --topic <トピック名>

# メッセージ受信
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.my-kafka.svc.cluster.local:9093

# メッセージ送信
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \
 -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.my-kafka.svc.cluster.local:9093 done;

メッセージの送受信テスト用に使えるDockerイメージとしてこれが使える。 gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1

ライブラリ

kaf

https://github.com/infinimesh/kaf

導入

curl https://raw.githubusercontent.com/infinimesh/kaf/master/godownloader.sh | BINDIR=$HOME/bin bash

$HOME/bin/kafにダウンロードされる。

コマンド一覧

# クラスタ登録
kaf config add-cluster local -b localhost:9092
# クラスタ登録(複数ブローカ)
kaf config add-cluster dev-brokers -b dev-kafka1:9092,dev-kafka2:9092,dev-kafka3:9092

# クラスタ選択
kaf config select-cluster

# トピック一覧
kaf topics -b aaa.bbb:9092
# トピック作成
kaf topic create hoge -b aaa.bbb:9092
# トピック削除
kaf topic delete hoge -b aaa.bbb:9092


# メッセージ受信
kaf consume -b aaa.bbb:9092 my.topic

memo

dockerイメージ

wurstmeister/zookeeper
wurstmeister/kafka:2.12-2.1.0
# environment:
#         KAFKA_BROKER_ID: 1
#         KAFKA_ADVERTISED_HOST_NAME: localhost
#         KAFKA_ZOOKEEPER_CONNECT: localhost:2181