kafka command line tools
First of all we gonna need kafka itself, e.g.:
docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0or
wget https://raw.githubusercontent.com/confluentinc/examples/5.3.1-post/cp-all-in-one/docker-compose.yml
docker-compose up -dor
run everything by hands on your own like described in quickstart
or
get confluent.cloud
Kafka topics
The most basic and needed
List topics
kafka-topics --bootstrap-server localhost:9092 --listCreate topic
kafka-topics --bootstrap-server localhost:9092 --create --topic demo2 --partitions 3 --replication-factor 1Delete topic
kafka-topics --bootstrap-server localhost:9092 --delete --topic demo2Console producer and consumers
Here are examples for following use cases:
- simple without key
- simple with string key
- simple with integer key
- json without key
- json with string key
- json with ingeteger key
- json with json key
- avro without key
- avro with string key
- avro with integer key
- avro with avro key
By deafult in all following examples messages delimited by new line, e.g. start producer, type something, press enter.
All follogin examples are run agains
docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0String producer and consumer
Simple without key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithoutKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce simple string messages
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithoutKeyStart kafka-console-consumer to consume simple string messages
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithoutKey --from-beginningProduce simple messages like:
hello
worldAnd you should see them in consumer as:
hello
worldSimple with string key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithStringKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce simple string messages with string key
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithStringKey --property parse.key=true --property key.separator=:Notes:
--property parse.key=trueour consumer will expect us to enter key along side value--property key.separator=:is optional and by default is space
Start kafka-console-consumer to consume simple string messages with string key
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithStringKey --property print.key=true --from-beginningNotes:
--property print.key=truewill print key
Produce messages like:
1:one
2:twoAnd you should see them in consumer as:
1 one
2 twoIf you try to produce message without key you should see an error:
>message without key
org.apache.kafka.common.KafkaException: No key found on line 3: acme
at kafka.tools.ConsoleProducer$LineMessageReader.readMessage(ConsoleProducer.scala:265)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:54)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)Simple with integer key
Not fully possible at moment, here are some links:
- kafka-console-producer ignores value serializer?
- Console Producer / Consumer's serde config is not working
- Console Producer sources
The problem is that no matter what you will pass to console producer it still will send bytes
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithIntKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce simple string messages with integer key
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithIntKey --property parse.key=true --property key.serializer=org.apache.kafka.common.serialization.IntegerDeserializer --property value.serializer=org.apache.kafka.common.serialization.StringDeserializer --property key.separator=:Notes:
--property key.serializer=org.apache.kafka.common.serialization.IntegerDeserializerdefines which deserializer should be used for key--property value.serializer=org.apache.kafka.common.serialization.StringDeserializerdefines which deserializer should be user for value- both not being applied
Start kafka-console-consumer to consume simple string messages with integer key
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithIntKey --property print.key=true --from-beginning --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --skip-message-on-errorNotes:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializerwe are forced to use string instead of integer deserializer here, otherwise will receive an errorERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4--skip-message-on-errordo not crash on bad message, just skip it
Produce messages like:
1:one
2:twoAnd you should see them in consumer as:
1 one
2 twoJson producer and consumer
Json without key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithoutKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce json messages
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithoutKeyNote that like in previous example with integer key, kafka-console-producer does not respect given serializers so we will just put string which looks like json but still sent as bytes
Start kafka-console-consumer to consume json messages
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithoutKey --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginning --property print.timestamp=trueProduce json messages like:
{"foo": "bar"}
{"acme": 42}And you should see them like:
CreateTime:1578081298745 {"foo":"bar"}
CreateTime:1578081304001 {"acme":42}There is not checks in producer but if you send something wrong you will see an error in consumer
CreateTime:1578081353956 [2020-01-03 19:55:54,970] ERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"foo"; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"foo"; line: 1, column: 7]Json with string key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithStringKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce json messages with string key
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithStringKey --property parse.key=true --property key.separator=:Start kafka-console-consumer to consume json messages with string key
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithStringKey --property print.key=true --from-beginning --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializerProduce messages:
1:{"foo":"bar"}
2:{"acme":42}And you should see:
1 {"foo":"bar"}
2 {"acme":42}Note that there is the same problem with keys as in previous examples, and you can not force integer key.
Json with json key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithJsonKey --partitions 3 --replication-factor 1Start kafka-console-producer which will produce json messages with json keys
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithJsonKey --property parse.key=true --property key.separator="|"Start kafka-console-consumer to consume json messages
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithJsonKey --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --property key.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginning --property print.key=trueProduce messages:
{"id":1}|{"foo":"bar"}
{"id":2}|{"acme":42}And you should see:
{"id":1} {"foo":"bar"}
{"id":2} {"acme":42}If you will produce bad key or value you will get:
ERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"foo"; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"foo"; line: 1, column: 7]Avro producer and consumer
Avro without key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithoutKey --partitions 3 --replication-factor 1Start kafka-avro-console-producer to produce avro messages
docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithoutKey --property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}'Note that from now on we are using kafka-avro-console-producer instead of kafka-console-producer which has few additional properties like --property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}' messages published via this consumer will be validated against given schema. Also note that this producer does not show > symbol, so do not wait for it.
Start kafka-avro-console-consumer to consume avro messages
docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithoutKey --from-beginningTry sending something like:
{"foo":"hello"}
{"foo":"world"}and you should see exactly the same output in consumer.
If you will try send something wrong you will receive an error:
{"acme":42}
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"acme":42} to Avro of schema {"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}
Caused by: org.apache.avro.AvroTypeException: Expected field name not found: foo
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:477)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)but still if something you are sending is schema compatible everything should be ok, try sending {"foo":"bar","acme":42} and you will receive {"foo":"bar"} in your consumer
Avro with string key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithStringKey --partitions 3 --replication-factor 1Start kafka-avro-console-producer to produce avro messages with primitive string key
docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithStringKey --property value.schema='{"type":"record","name":"AvroWithStringKey","fields":[{"name":"foo","type":"string"}]}' --property parse.key=true --property key.schema='{"type":"string"}' --property key.separator=" "Not that we have added --property key.schema='{"type":"string"}' which allow us to use primitives as key and they still will be validated.
Start kafka-avro-console-consumer to consume avro messages with string keys
docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithStringKey --from-beginning --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializerTry send something like:
"one" {"foo":"1"}
"two" {"foo":"2"}and you should get:
one {"foo":"1"}
two {"foo":"2"}Do not forget to wrap key with double quotes otherwise you will get an error:
org.apache.kafka.common.errors.SerializationException: Error deserializing json one to Avro of schema "string"
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('o' (code 111)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@3feb2dda; line: 1, column: 2]Avro with int key
Does not work, in example below after trying to send 1|{"foo":"bar"} receiving an error:
org.apache.kafka.common.errors.SerializationException: Error deserializing json 1|{"foo":"hello"} to Avro of schema {"type":"record","name":"AvroWithIntKey","fields":[{"name":"foo","type":"int"}]}
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_NUMBER_INTCreate topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithIntKey --partitions 3 --replication-factor 1Start kafka-avro-console-producer to produce avro messages with integer keys
docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithIntKey --property value.schema='{"type":"record","name":"AvroWithIntKey","fields":[{"name":"foo","type":"int"}]}' --property key.separator="|"Start kafka-avro-console-consumer to consume avro messages
docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithIntKey --from-beginningAvro with avro key
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithAvroKey --partitions 3 --replication-factor 1Start kafka-avro-console-producer which will produce avro messages with avro keys
docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithAvroKey --property value.schema='{"type":"record", "name": "AvroWithAvroKey", "fields":[{"name":"foo","type":"string"}]}' --property parse.key=true --property key.schema='{"type":"record","name": "key", "fields":[{"name":"id","type":"int"}]}' --property key.separator=" "Start kafka-avro-console-consumer to consume avro messages with avro keys
docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithAvroKey --from-beginning --property print.key=trueTry send
{"id":1} {"foo":"hello"}
{"id":2} {"foo":"world"}and you should receive
{"id":1} {"foo":"hello"}
{"id":2} {"foo":"world"}if you will try send wrong key like {"id":"guid"} you will receive an error
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"id":"guid"} to Avro of schema {"type":"record","name":"key","fields":[{"name":"id","type":"int"}]}
Caused by: org.apache.avro.AvroTypeException: Expected int. Got VALUE_STRINGConfluent Cloud
If you are using confluent.cloud from confluent.io you still able to do all this with few more params added for commands
More examples can be found here
Topics
You gonna need properties file which you can retrieve from https://confluent.cloud/environments/*****/clusters/***-*****/integrations/clients#java by navigating cluster then "CLI & client configuration"
cloud.properties
bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--command-config /cloud.properties \
--listall other commands will work as expected
Produce & consume simple messages
If you are going to run simple producer without avro and schema registry then properties file from previous example should be enough
Create topic
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--command-config /cloud.properties \
--create --topic simple1 --partitions 3 --replication-factor 3Start producer
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-console-producer \
--broker-list xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--producer.config /cloud.properties \
--topic simple1Start consumer
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-console-consumer \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--consumer.config /cloud.properties \
--topic simple1Cleanup
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--command-config /cloud.properties \
--delete --topic simple1Note that you are not restricted to strings only, you can also use all previous examples with different keys and json
Produce & consume AVRO messages in confluent.cloud
Create topic
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--command-config /cloud.properties \
--create --topic avro1 --partitions 3 --replication-factor 3Start producer
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-producer \
--broker-list xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--topic avro1 \
--property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}' \
--producer.config /cloud.properties \
--property schema.registry.url="https://xxxx-xxxxx.us-east1.gcp.confluent.cloud" \
--property schema.registry.basic.auth.user.info="xxxxxxx:xxxxxxx" \
--property basic.auth.credentials.source=USER_INFOStart consumer
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-consumer \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--topic avro1 \
--from-beginning \
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
--consumer.config /cloud.properties \
--property schema.registry.url="https://xxxx-xxxxx.us-east1.gcp.confluent.cloud" \
--property schema.registry.basic.auth.user.info="xxxxxxx:xxxxxxx" \
--property basic.auth.credentials.source=USER_INFONotes:
- we are using another docker image
confluentinc/cp-schema-registry:5.3.2because of kafka avro console consumer and producer cloud.propertiesis still enough but schema registry settings should be passed via command line arguments
Kafka connect
We are going to use kafka connect to:
- produce predefined messages from a file to replay some sequence of events
- produce generated messages to get millions of them for test purposes
- have sample sink connector to save messages to a file
All example will be made as standalone worker which should not be used in production and used here only because of its easy to use
At very end worker command looks liks like this: connect-standalone worker.properties task1.properties task2.properties where worker.properties contains configuration for worker itself and some defaults for tasks, taskX.properties is task configuration, you can have many of them, for example your worker might have few tasks which will produce messages from different files and one task to consume them into elasticsearch.
Tasks producing data into kafka called source, tasks consuming data from kafka called sink.
Be aware of advertised hosts and rest ports, if you are connecting to dockerized kafka which have localhost as advertised host from your worker which is also run in container nothing will work, use --net=host for such scenarios, but then you gonna need to change rest.port to avoid conflict with already taken port.
More links about worker properties:
Also you can get samples like so:
docker run -it --rm confluentinc/cp-kafka-connect:5.3.2 cat /etc/schema-registry/connect-avro-standalone.propertiesLocal kafka connect
Start your kafka
docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8082:8081-8082 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0Note that I'm not exposing 8083 which is used by kafka connect rest api to avoid conflicts, otherwise do not forget to change rest.port in worker.properties
Simple messages
worker.properties
bootstrap.servers=localhost:9092
# do not forget to change me to avoid conflicts
rest.port=8083
# required for standalone workers
offset.storage.file.filename=/tmp/standalone.offsets
# where to look for additional plugins
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
# optional, defaults for tasks
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverterNotes:
- key and value converters are optional and can be overriden in tasks
- most used converters are:
org.apache.kafka.connect.storage.StringConverter,org.apache.kafka.connect.json.JsonConverter,io.confluent.connect.avro.AvroConverter - avro converter requires schema registry
- for json converter do not forget to add
value.converter.schemas.enable=falseif you wish not to receive schema, e.g. by sending{"foo":"bar"}you will receive{"schema":{"type":"string","optional":false},"payload":"{\"foo\": \"bar\"}"}
Kafka Connect Source Text File
Notes on task configuration properties:
- do not forget that each task should have unique
nameit will be used to watch for offsets and for distributed wrokers it will be used for topic names connector.classis a kind of plugin, you can choose from hub.confluent.iotasks.maxcontrol parallelism, for sink tasks can not be bigger that number of topic partitions
source-text-file.properties
name=source-text-file
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
# optional, override worker defaults
value.converter=org.apache.kafka.connect.storage.StringConverter
topic=DemoTextFile
file=demo-text-file.txtdemo-text-file.txt
hello
world
mac
was
hereCreate topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic DemoTextFile --partitions 3 --replication-factor 1Start consumer
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic DemoTextFile --from-beginningStart worker
docker run -it --rm \
--name=standalone \
--net=host \
-v $PWD:/data \
-w /data \
confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties source-text-file.propertiesNote that we are bypassing our current directory into container so worker has access to all configuration files
If everything is ok after some while you will see your messages from a source file in your consumer
Kafka Connect Source JSON File
This one will work same way as previous
source-json-file.properties
name=source-json-file
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
# optional, override worker defaults
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=false
# if your will use JsonConverter here you will receive string with escaped json
value.converter=org.apache.kafka.connect.storage.StringConverter
topic=DemoJsonFile
file=demo-json-file.ndjsondemo-json-file.ndjson
{"foo": "hello"}
{"foo": "world"}
{"foo": "bar"}
{"acme": 42}Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic DemoJsonFile --partitions 3 --replication-factor 1Start consumer
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic DemoJsonFile --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginningStart worker
docker run -it --rm \
--name=standalone \
--net=host \
-v $PWD:/data \
-w /data \
confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties source-json-file.propertiesWhile everything running, try add more records to a source file and save it, you should immediatelly see them in consumer.
Also try to add non json line to a source file, you will get an error:
[2020-01-04 10:09:00,896] ERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'non': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"non json"; line: 1, column: 5]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'non': was expecting 'null', 'true', 'false' or NaN
at [Source: (byte[])"non json"; line: 1, column: 5]but because we are running consumer with a --skip-message-on-error flag it should not die and continue listening to new records
unfortunatelly there is no way to produce messages with keys from simple files, if you will look at sources you will see that null is passed as key
If you wish to have keys you should run configured console producer and pipe file contents into it
Replaying Avro Messages With Key Value
This particular example does not use Kafka Connect but still might be used to replay some sequence of messages
Lets suppose that our source.txt file will look like:
source.txt
{"id":1}|{"foo":"hello"}
{"id":2}|{"foo":"world"}where each line is an message with key and value separated by pipe
Create topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroFromFile --partitions 3 --replication-factor 1Start kafka-avro-console-consumer to consume avro messages from file
docker run -it --rm --net=host confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroFromFile --from-beginning --property print.key=trueStart kafka-avro-console-producer which will produce avro messages from a file
docker run -it --rm --net=host -v $PWD:/data -w /data confluentinc/cp-schema-registry:5.3.2 sh -c "kafka-avro-console-producer --broker-list localhost:9092 --topic AvroFromFile --property value.schema='{\"type\":\"record\", \"name\": \"AvroFromFile\", \"fields\":[{\"name\":\"foo\",\"type\":\"string\"}]}' --property parse.key=true --property key.schema='{\"type\":\"record\",\"name\": \"key\", \"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}' --property key.separator=\"|\" < source.txt"And you should see your desired messages in consumer:
{"id":1} {"foo":"hello"}
{"id":2} {"foo":"world"}Note that I have used sh -c "...." here because of bash can not understand whether last < source.txt should be ran inside docker or not
Kafka Connect Source DataGen Avro
In following example we are going to generate tousand of recods based on given avro schema
source.properties
name=source
connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=AvroDatagen
# override worker.properties
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
# number of messages to generate
iterations=1000
tasks.max=1
# avro schema
schema.filename=/data/AvroDatagen.avscSome additional properties can be found here
Note that by default auto.register.schemas is set to true so you do not need to register schemas upfront everything will be done automatically. Also note that both key.subject.name.strategy and value.subject.name.strategy are set to io.confluent.kafka.serializers.subject.SubjectNameStrategy so schema names will be AvroDatagen-key and AvroDatagen-value retrospectively.
AvroDatagen.avsc
{
"type": "record",
"name": "AvroDatagen",
"namespace": "ua.rabota.topics",
"fields": [
{
"name": "userId",
"type": {
"type": "int",
"arg.properties": {
"range": {
"min": 1,
"max": 100
}
}
}
},
{
"name": "vacancyId",
"type": {
"type": "long",
"arg.properties": {
"range": {
"min": 7710732,
"max": 7711732
}
}
}
},
{
"name": "platform",
"type": ["null", {
"type": "string",
"arg.properties": {
"options": ["desktop", "mobile", "ios", "android"]
}
}],
"default": null
}
]
}Note that usually in avro schema you defining properties like {"name": "foo", "type": "string"} where type is usually primitive string with type name, for datagen we are describing type as object with additional arg.properties
Crate topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroDatagen --partitions 3 --replication-factor 1Start consumer
docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroDatagen --from-beginningStart avro datagen producer
docker run -it --rm \
--name=standalone \
--net=host \
-v $PWD:/data \
-w /data \
confluentinc/cp-kafka-connect:5.3.2 bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.2.0 && connect-standalone worker.properties source.properties"Note how we are installing kafka-connect-datagen before starting connect-standalone it does not shipped by deafult
After a while, when everything will boot up you should see incomming messages in consumer
When datagen will produce desired 1000 messages it will die and you will see something like:
[2020-01-04 11:22:37,984] ERROR WorkerSourceTask{id=source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Stopping connector: generated the configured 1000 number of messagesUnfortunatelly datagen is quite limited about keys only way you can have keys is to provide schema.keyfield which will use one of generated properties as message key, and according to sources it still will be simple string key.
Kafka Connect Simple Sink To Text File
This might be used for debug and log
sink.properties
name=sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=SinkDemo
file=/data/data.txtCreate topic
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SinkDemo --partitions 3 --replication-factor 1Start Kafka Connect Sink to save messages to a text file
docker run -it --rm \
--name=standalone \
--net=host \
-v $PWD:/data \
-w /data \
confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties sink.propertiesStart console producer
docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SinkDemoand start typing messages into it, you should immediatelly see them in text file
Do not forget that you can run some tricky setups like connect-standalone worker.properties source.properties sink.properties which might generate data into topic and immediatelly sink them into source
Standalone connect worker with confluent.cloud
All previous examples should work well with confluent.cloud if you will provide required configuration options
What you gonna need
cloud.properties
bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";this file will be used by kafka-topics to create topic
worker.properties
bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
offset.storage.file.filename=/tmp/standalone.offsets
# TODO: check whether this is a deafults
# default 60000
offset.flush.interval.ms=10000
# default 40000
request.timeout.ms=20000
# 100
retry.backoff.ms=500
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
# deafult https
ssl.endpoint.identification.algorithm=https
# default PLAINTEXT
security.protocol=SASL_SSL
# default GSSAPI
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
# Connect producer and consumer specific configuration
producer.ssl.endpoint.identification.algorithm=https
producer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
consumer.ssl.endpoint.identification.algorithm=https
consumer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=xxxxxxx:xxxxxxx
value.converter.schema.registry.url=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=xxxxxxx:xxxxxxx
key.converter.schema.registry.url=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud
# additions - https://docs.confluent.io/current/cloud/connect/connect-cloud-config.html
confluent.topic.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.mechanism=PLAIN
reporter.admin.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAINthis one is for worker to be able to comminicate with confluent cloud
source.properties
name=source
tasks.max=1
connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=demo1
iterations=1000
schema.filename=/data/demo1.avscthis one will be used by datagen connector to generate random data into given topic
sink.properties
name=sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
topics=demo1
file=/data/data.txtsing generated messages back from cloud to local file
demo1.avsc
{
"type": "record",
"name": "demo1",
"namespace": "ua.rabota.topics",
"fields": [
{
"name": "userId",
"type": {
"type": "int",
"arg.properties": {
"range": {
"min": 1,
"max": 100
}
}
}
},
{
"name": "vacancyId",
"type": {
"type": "long",
"arg.properties": {
"range": {
"min": 7710732,
"max": 7711732
}
}
}
},
{
"name": "platform",
"type": ["null", {
"type": "string",
"arg.properties": {
"options": ["desktop", "mobile", "ios", "android"]
}
}],
"default": null
}
]
}schema for messages to be generated
create topic
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
--bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
--command-config /cloud.properties \
--create --topic demo1 --partitions 3 --replication-factor 3start worker
docker run -it --rm \
--name=standalone \
-v $PWD:/data \
-w /data \
confluentinc/cp-kafka-connect:5.3.2 bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.2.0 && connect-standalone worker.properties source.properties sink.properties"after a while you will see that your data.txt file becomes full of random generated messages
So now you can quickly send batch of messages both generated and predefined not only to local kafka but also to your confluent cloud one - profit
Distributed Worker
Confluent cloud not giving you distributed workers for some reasons. Seems like it is because they do not know how much of them you gonna need. To start your own connect cluster you will need worker.properties from previous example just remove offset.storage.file.filename from it and add
group.id=mac1
offset.storage.topic=mac1-offsets
config.storage.topic=mac1-configs
status.storage.topic=mac1-status
offset.storage.partitions=3
replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3take a closer look to first four settings, make sure they are unique
The difference between standalone and distributed worker is that from now you going to add and remove your tasks via rest api
In most of the cases everything will look the same as in previous examples, except that now you are going to post json instead of property files like in example from docs:
POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json
{
"name": "hdfs-sink-connector",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "10",
"topics": "test-topic",
"hdfs.url": "hdfs://fakehost:9000",
"hadoop.conf.dir": "/opt/hadoop/conf",
"hadoop.home": "/opt/hadoop",
"flush.size": "100",
"rotate.interval.ms": "1000"
}
}Here is an example of docker run which is a good starting point to run your connect cluster in kubernetes
docker run -it --rm \
--name=mac1 \
-p 8083:8083 \
-e CONNECT_BOOTSTRAP_SERVERS=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
-e CONNECT_GROUP_ID=mac1 \
-e CONNECT_OFFSET_STORAGE_TOPIC=mac1-offsets \
-e CONNECT_CONFIG_STORAGE_TOPIC=mac1-configs \
-e CONNECT_STATUS_STORAGE_TOPIC=mac1-status \
-e CONNECT_OFFSET_STORAGE_PARTITIONS=3 \
-e CONNECT_REPLICATION_FACTOR=3 \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
-e CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000 \
-e CONNECT_REQUEST_TIMEOUT_MS=20000 \
-e CONNECT_RETRY_BACKOFF_MS=500 \
-e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000 \
-e CONNECT_CONSUMER_RETRY_BACKOFF_MS=500 \
-e CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000 \
-e CONNECT_PRODUCER_RETRY_BACKOFF_MS=500 \
-e CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
-e CONNECT_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_SASL_MECHANISM=PLAIN \
-e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
-e CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
-e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
-e CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
-e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
-e CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL=SASL_SSL \
-e CONNECT_PRODUCER_SASL_MECHANISM=PLAIN \
-e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM=PLAIN \
-e CONNECT_CONSUMER_SASL_MECHANISM=PLAIN \
-e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM=PLAIN \
-e CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
-e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
-e CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
-e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
-e CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=xxxxxxx:xxxxxxx \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud \
-e CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=true \
-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true \
-e CONNECT_REST_POST=8083 \
-e CONNECT_REST_ADVERTISED_HOST_NAME=localhost \
-e CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=xxxxxxx:xxxxxxx \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud
confluentinc/cp-kafka-connect:5.3.2Bash aliases
Even simple operations like creating topic becomes not easy to remember especially if you will have local, dev, prod kafka clusters
If you are using ccloud command line tool you already should have ~/.ccloud/ which to me seems a good place to save my cloud.properties files in my case it will be dev.properties and prod.peroperties
Here are few starting point examples
Local kafka bash aliases
alias local-topic="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-topics --bootstrap-server localhost:9092"
alias local-topic-list="local-topic --list"
alias local-topic-delete="local-topic --delete --topic"
alias local-topic-describe="local-topic --describe --topic"
alias local-topic-create="local-topic --create --replication-factor 1 --topic"
alias local-topic-create1="local-topic --create --replication-factor 1 --partitions 1 --topic"
alias local-topic-create2="local-topic --create --replication-factor 1 --partitions 2 --topic"
alias local-console-consumer="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic"
alias local-console-producer="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-console-producer --broker-list localhost:9092 --topic"Confluent cloud kafka bash aliases
alias dev-topic="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-topics --bootstrap-server $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --command-config dev.properties"
alias dev-topic-list="dev-topic --list"
alias dev-topic-delete="dev-topic --delete --topic"
alias dev-topic-describe="dev-topic --describe --topic"
alias dev-topic-create="dev-topic --create --replication-factor 3 --topic"
alias dev-console-consumer="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-console-consumer --bootstrap-server $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --consumer.config dev.properties --topic"
alias dev-console-producer="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-console-producer --broker-list $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --producer.config dev.properties --topic"