Sunday, June 4, 2017

Confluent Kafka connector examples

What is Kafka Connect?



Installation

http://docs.datamountaineer.com/en/latest/install.html#install

https://www.confluent.io/blog/introducing-confluent-control-center/

https://github.com/datamountaineer/stream-reactor/releases

Under construction: do not use

Debugging issues with the connector versions and the instructions.

Simple connection

Monitor the contents of a file and send it over Kafka from the source to the sink.

# The following article contains errors
# https://docs.confluent.io/current/installation/installing_cp.html

1. Create a script to initialize Zookeeper, kafka server, and the schema registry in the confluent directory

cd confluent
vi initialize_services.sh 
#! /bin/bash -e
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &

./bin/kafka-server-start ./etc/kafka/server.properties &

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &


vi stop_services.sh
#! /bin/bash -e

bin/kafka-server-stop --help
bin/zookeeper-server-stop
bin/schema-registry-stop



# change the permissions on the scripts


chmod +x stop_services.sh


chmod +x init_services.sh


# 2.  in term 1 init the basic services

cd confluent
./initialize_services.sh 


# run the Zookeeper shell and check Zookeeper

bin/zookeeper-shell  127.0.0.1:2181   

# https://issues.apache.org/jira/browse/KAFKA-2385

# The zookeeper shell shipped with Kafka does not work because jline jar is missing.


2. Simple consumer example
In a new terminal start a Kafka Connect instance in standalone mode running this connector. For standalone mode, we can specify the connector configurations directly on the command line:
$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
      ./etc/kafka/connect-file-source.properties 
In a new terminal start the consumer. Each of the two lines in our log file should be delivered to Kafka, having registered a schema with the Schema Registry. One way to validate that the data is there is to use the console consumer in another console to inspect the contents of the topic:
$ ./bin/kafka-avro-console-consumer --bootstrap-server localhost:2181 --topic connect-test --from-beginning
  "log line 1"
  "log line 2"
We can also start another connector to log each message to the console. Hit Ctrl-C to gracefully stop the Kafka Connect process. Then take a look at the configuration for a sink task that logs each message to the console:
$ cat ./etc/kafka/connect-file-sink.properties
The configuration contains similar settings to the file source. Because its functionality is so simple, it has no additional configuration parameters.
In a new terminal start the Kafka Connect standalone process, but this time specify both connector configurations. They will run in the same process, but each will have its own dedicated thread.
$ ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
      ./etc/kafka/connect-file-source.properties ./etc/kafka/connect-console-sink.properties
  ... [ start up logs clipped ] ...
  "log line 1"
  "log line 2"
Once the process is up and running, you should see the two log lines written to the console as the sink connector consumes them. Note that the messages were not written again by the source connector because it was able to resume from the same point in the file where it left off when we shut down the previous process.
With both connectors running, we can see data flowing end-to-end in real time. Use another terminal to add more lines to the text file:
$ echo -e "log line 3\nlog line 4" >> test.txt
and you should see them output on the console of the Kafka Connect standalone process. The new data was picked up by the source connector, written to Kafka, read by the sink connector from Kafka, and finally logged to the console$ echo -e "log line 1\nlog line 2" > test.txt
Trouble shooting
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid (use the endpoint `/{connectorType}/config/validate` to get a full list of errors)


[2017-06-06 05:04:20,659] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-06-06 05:04:20,671] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-06-06 05:04:20,679] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
org.apache.kafka.common.config.ConfigException: No supported Kafka endpoints are configured. Either kafkastore.bootstrap.servers must have at least one endpoint matching kafkastore.security.protocol or broker endpoints loaded from ZooKeeper must have at least one endpoint matching kafkastore.security.protocol.
at io.confluent.kafka.schemaregistry.storage.KafkaStore.endpointsToBootstrapServers(KafkaStore.java:313)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:130)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:144)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
at io.confluent.rest.Application.createServer(Application.java:149)

at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)






No comments:

Post a Comment