Kafka
ref: Apache Kafka Apache Kafkaに入門した
本: https://www.oreilly.co.jp/books/9784873118499/
目次:
導入
Dockerで構築
HOST_IP=$(ipconfig getifaddr en1)
docker run --rm -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=$HOST_IP --env ADVERTISED_PORT=9092 spotify/kafka
CLI(mac)
brew install kafka
CLI(Ubuntu18.04)
~/tools
の下にインストールする例
(mkdir -p ~/tools \
&& cd ~/tools \
&& curl -LO https://www-us.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz \
&& tar -xzf kafka_2.11-2.1.0.tgz \
&& rm kafka_2.11-2.1.0.tgz)
基本
Consumer
トピックに発行される各レコードは、各コンシューマグループ内の1つのインスタンスにロードバランスされて配信される。 各コンシューマのグループIDが異なる場合はブロードキャストされる。
Ref: https://kafka.apache.org/documentation/#intro_consumers
Kafka console scripts
ref: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz Kafka | Quickstart
次のコマンドで/c/tools/kafka_2.11-2.1.0/
にダウンロードする。
(cd ~/downloads \
&& curl -LO http://ftp.jaist.ac.jp/pub/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz \
&& tar -xzf kafka_2.11-2.1.0.tgz \
&& cp -R kafka_2.11-2.1.0/ /c/tools/kafka_2.11-2.1.0/)
Windowsの場合はkafka_2.11-2.1.0/bin/windows
配下にパスを通し、.sh
の代わりに.bat
を使う。
トピック作成
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
トピック削除
ref: Manually delete Apache Kafka topics
server.properties
に以下の1行を追記する。
delete.topic.enable=true
kafka-topics.sh --zookeeper localhost:2181 --delete --topic [トピック名]
メッセージ送信
kafka-console-producer.sh --broker-list localhost:9092 --topic [トピック名]
# jsonファイルを送信
cat hoge.json | jq -c . | kafka-console-producer.sh --broker-list localhost:9092 --topic [トピック名]
# ファイルを1行ごと送信
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
Ref:
jsonを送信するときは1行に整形してから送信する。
メッセージ受信
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [トピック名] --from-beginning
# メッセージをファイルに保存
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [トピック名] --from-beginning > hoge.txt
メッセージ削除
# retentionを1秒にする
kafka-topics.sh --zookeeper <zookeeper host>:2181 --alter --topic <topic name> --config retention.ms=1000
# メッセージが消えるまで待つ。。。(1分くらい
# retentionをもとに戻す
kafka-configs.sh --zookeeper <zookeeper host>:2181 --entity-type topics --alter --entity-name <topic name> --delete-config retention.ms
グループIDの一覧を取得
グループに紐づいているトピックを確認できる。
kafka-consumer-groups.sh --bootstrap-server <kafka-host>:9092 --group <group-id> --describe
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# my-topic 0 269 269 0 consumer-host /100.73.103.226 my-consumer-0
Spring Bootで使う
ref: Intro to Apache Kafka with Spring tonsV2/spring-kotlin-kafka
build.gradle
build.gradle
に依存関係を追加する。
// Kafka
implementation 'org.springframework.kafka:spring-kafka'
// Kotlinのdata classをjsonに変換するために必要
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'
application.yml
kafka:
bootstrapAddress: localhost:9092
message:
topic:
name: test-topic
greeting:
topic:
name: greeting
Consumerの設定
const val GROUP_ID = "test-group"
@Configuration
@EnableKafka
class KafkaConsumerConfig(
@Value("\${spring.kafka.bootstrapAddress}")
private val bootstrapAddress: String = "localhost:9092"
) {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props = HashMap<String, Any?>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
// 文字列のメッセージを受信するコンシューマファクトリ
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
fun greetingConsumerFactory(): ConsumerFactory<String, Greeting> {
val props = HashMap<String, Any?>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.GROUP_ID_CONFIG] = "greeting"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
props[JsonDeserializer.VALUE_DEFAULT_TYPE] = Greeting::class.java
return DefaultKafkaConsumerFactory<String, Greeting>(
props,
StringDeserializer(),
// KotlinModule()を登録しないとdata classが正常に変換できない
JsonDeserializer(Greeting::class.java, ObjectMapper().registerModule(KotlinModule())))
}
// jsonのメッセージを受信するコンシューマファクトリ
@Bean
fun greetingKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Greeting> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Greeting>()
factory.consumerFactory = greetingConsumerFactory()
return factory
}
}
Consumer
const val topicString = "topic-string"
const val topicJson = "topic-json"
@EnableKafka
@Configuration
class KafkaConsumer(private val handler: Handler) {
// 文字列のメッセージを受信する
@KafkaListener(topics = [topicString], groupId = "foo")
fun listen(message: String) {
message.toMono()
.doOnNext { println("Received String in group foo: $message") }
.subscribe()
}
// jsonのメッセージを受信する
@KafkaListener(topics = [topicJson], groupId = "foo", containerFactory = "greetingKafkaListenerContainerFactory")
fun listen(message: Greeting) {
message.toMono()
.doOnNext { println("Received Json in group foo: ${message.msg}") }
.subscribe()
}
}
Producerの設定
@Configuration
class KafkaProducerConfig(
@Value("\${kafka.host:localhost}")
private val bootstrapAddress: String = "localhost:9092"
) {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configProps = HashMap<String, Any?>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun greetingProducerFactory(): ProducerFactory<String, Greeting> {
val configProps = HashMap<String, Any?>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
// ヘッダを消さないと"Magic v1 does not support record headers"とエラーがでる
configProps[JsonSerializer.ADD_TYPE_INFO_HEADERS] = false
return DefaultKafkaProducerFactory(configProps)
}
// 文字列のメッセージを送信するプロデューサ
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
// jsonのメッセージを送信するプロデューサ
@Bean
fun greetingKafkaTemplate(): KafkaTemplate<String, Greeting> {
return KafkaTemplate<String, Greeting>(greetingProducerFactory())
}
}
// 送受信するjsonのDTO
data class Greeting(val msg: String, val name: String)
Producer
@Service
class KafkaProducer(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val greetingKafkaTemplate: KafkaTemplate<String, Greeting>
) {
fun send(msg: String) = kafkaTemplate.send(topicString, msg)
fun send(msg: Greeting) = greetingKafkaTemplate.send(topicJson, msg)
}
Topicの設定
KafkaAdmin
を利用するとアプリ起動時に勝手にトピックを作成してくれる。
@Configuration
class KafkaTopicConfig {
@Value(value = "\${kafka.bootstrapAddress}")
private val bootstrapAddress: String = "localhost:9092"
@Value(value = "\${message.topic.name}")
private val topicName: String? = null
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs = HashMap<String, Any>()
configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
return KafkaAdmin(configs)
}
@Bean
fun topic1(): NewTopic {
return NewTopic(topicName, 1, 1.toShort())
}
}
Tips
グループごとにオフセットをリセットする
グループidを変えなくても再consumeしたいときなどに。
SERVERS="kaf1:9092,kaf2:9092"
GROUP="my-group"
TOPIC="my-topic"
## 確認
kafka-consumer-groups.sh --bootstrap-server $SERVERS --group $GROUP --describe
## オフセットをリセット
kafka-consumer-groups.sh --bootstrap-server $SERVERS --group $GROUP --topic $TOPIC --reset-offsets --to-earliest --execute
Ref: https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
Client library (Ruby)
インストール
FAQ
alpineで使うとError loading shared library ld-linux-x86-64.so.2: No such file or directory
libc6-compat
が必要。
apk add libc6-compat
InstanceAlreadyExistsException
clientIdが重複していると起こる。@KafkaListener
が複数ある場合はclientIdPrefix
をそれぞれ別の値に設定することで解消する。
@KafkaListener(topics = ["my-topic-1"], groupId = "my-group", clientIdPrefix = "my-prefix-1")
fun listen1(messageJson: String) {
// ...
}
@KafkaListener(topics = ["my-topic-2"], groupId = "my-group", clientIdPrefix = "my-prefix-2")
fun listen2(messageJson: String) {
// ...
}
Magic v1 does not support record headers
ref: Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)
JsonSerializerを使って値を送信するときに発生する。
JsonSerializerを使うとデフォルトでヘッダ(JsonSerde)が追加されるが、このエラーを解消するためにはヘッダの追加を無効化する必要がある。
以下のようにJsonSerializer.ADD_TYPE_INFO_HEADERS
にfalse
を設定する。
@Bean
fun greetingProducerFactory(): ProducerFactory<String, Greeting> {
val configProps = HashMap<String, Any?>()
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
// この行を追加する
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return DefaultKafkaProducerFactory(configProps)
}
コンシューマでdata classのデシリアライズに失敗する
ref: https://stackoverflow.com/a/50950955
JsonDeserializerにKotlinModule()
を登録したデシリアライザを設定すると解決する。
fun greetingConsumerFactory(): ConsumerFactory<String, Greeting> {
val props = HashMap<String, Any?>()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting")
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Greeting::class.java)
return DefaultKafkaConsumerFactory<String, Greeting>(
props,
StringDeserializer(),
// JsonDeserializer(Greeting::class.java)
// ↑の代わりにこれを追加
JsonDeserializer(Greeting::class.java, ObjectMapper().registerModule(KotlinModule())))
}
Kubernetes
yamlファイル作成
ココを参考にする。 https://github.com/kow3ns/kubernetes-kafka/tree/master/manifests
Unable to connect to zk-cs.default.svc.cluster.local:2181
のようなエラーがでたときは、default
をnamespace名に置き換える。
kafka_micro.yaml
---
apiVersion: v1
kind: Service
metadata:
name: kafka-hs
labels:
app: kafka
spec:
ports:
- port: 9093
name: server
clusterIP: None
selector:
app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
spec:
selector:
matchLabels:
app: kafka
maxUnavailable: 1
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-hs
replicas: 1
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app: kafka
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
containers:
- name: k8skafka
imagePullPolicy: Always
image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
resources:
requests:
memory: "1Gi"
cpu: "0.5"
ports:
- containerPort: 9093
name: server
# zookeeper.connectに設定するドメイン名`my-kafka`は、namespace名に置き換えること
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
--override listeners=PLAINTEXT://:9093 \
--override zookeeper.connect=zk-cs.my-kafka.svc.cluster.local:2181 \
--override log.dir=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=0.10.2-IV0 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
securityContext:
runAsUser: 1000
fsGroup: 1000
zookeeper_micro.yaml
---
apiVersion: v1
kind: Service
metadata:
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
labels:
app: zk
spec:
ports:
- port: 2181
name: client
selector:
app: zk
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: zk
spec:
serviceName: zk-hs
replicas: 1
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app: zk
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
image: "gcr.io/google_containers/kubernetes-zookeeper:1.0-3.4.10"
resources:
requests:
memory: "1Gi"
cpu: "0.5"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=1 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
securityContext:
runAsUser: 1000
fsGroup: 1000
デプロイと動作確認
# namespaceの名前は`kafka_micro.yaml`の設定値と合わせること
kubectl create namespace my-kafka
# デプロイ
kubectl create -f zookeeper_micro.yaml
kubectl create -f kafka_micro.yaml
# トピック作成
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 createtopic --restart=Never --rm -- kafka-topics.sh --create \
--topic test \
--zookeeper zk-cs.my-kafka.svc.cluster.local:2181 \
--partitions 1 \
--replication-factor 1
# トピック削除
# コンシューマを止めておくこと
kafka-topics.sh --delete --zookeeper localhost:2181 --topic <トピック名>
# メッセージ受信
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.my-kafka.svc.cluster.local:9093
# メッセージ送信
kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \
-- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.my-kafka.svc.cluster.local:9093 done;
メッセージの送受信テスト用に使えるDockerイメージとしてこれが使える。 gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
ライブラリ
kaf
https://github.com/infinimesh/kaf
導入
curl https://raw.githubusercontent.com/infinimesh/kaf/master/godownloader.sh | BINDIR=$HOME/bin bash
$HOME/bin/kaf
にダウンロードされる。
コマンド一覧
# クラスタ登録
kaf config add-cluster local -b localhost:9092
# クラスタ登録(複数ブローカ)
kaf config add-cluster dev-brokers -b dev-kafka1:9092,dev-kafka2:9092,dev-kafka3:9092
# クラスタ選択
kaf config select-cluster
# トピック一覧
kaf topics -b aaa.bbb:9092
# トピック作成
kaf topic create hoge -b aaa.bbb:9092
# トピック削除
kaf topic delete hoge -b aaa.bbb:9092
# メッセージ受信
kaf consume -b aaa.bbb:9092 my.topic
memo
dockerイメージ
wurstmeister/zookeeper
wurstmeister/kafka:2.12-2.1.0
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ADVERTISED_HOST_NAME: localhost
# KAFKA_ZOOKEEPER_CONNECT: localhost:2181