apache kafka – confluent scheme log coupler

I want to use the scheme log coupler (image owned by confluent) with my open source Kafka that I installed locally on my PC.

I am using the following command to execute the image:

docker run -p 8081:8081  
    -e  SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://127.0.0.1:9092 
    -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 
    -e SCHEMA_REGISTRY_DEBUG=true confluentinc/cp-schema-registry:latest

but I receive the following connection errors:

(kafka-admin-client-thread | adminclient-1) WARN org.apache.kafka.clients.NetworkClient - (AdminClient clientId=adminclient-1) Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
(kafka-admin-client-thread | adminclient-1) WARN org.apache.kafka.clients.NetworkClient - (AdminClient clientId=adminclient-1) Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
(main) ERROR io.confluent.admin.utils.ClusterStatus - Error while getting broker list.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:149)
    at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
(main) INFO io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...

I have Kafka installed on my localhost.
Any ideas to solve this please?

Not all kafka consumers are being assigned to partitions

I have 10 consumers and 10 partitions.
I take the number of partitions

    int partitionCount = getPartitionCount(kafkaUrl);

and I create the same number of consumers with the same Group ID.

Stream.iterate(0, i -> i + 1)
.limit(partitionCount)
.forEach(index -> executorService.execute(() -> createCosnumer(consumerConfig(index), topicName)

config looks like this

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

each consumer assigns a topic and begins to consume

consumer.subscribe(topicName);
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(4));
if (consumerRecords.count() > 0) {
consumeRecords(consumerRecords);
consumer.commitSync();
}
}

what I see after assigning consumers to the partition

TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CLIENT-ID                                                        
topicName  1          89391           89391           0               consumer0
topicName  3          88777           88777           0               consumer1
topicName  5          89280           89280           0               consumer2
topicName  4          88776           88776           0               consumer2
topicName  0          4670991         4670991         0               consumer0
topicName  9          23307           89343           66036           consumer4
topicName  7          89610           89610           0               consumer3
topicName  8          88167           88167           0               consumer4
topicName  2          89138           89138           0               consumer1
topicName  6          88967           88967           0               consumer3

only half of consumers have been assigned to partitions
Why did this happened? There must be one consumer per partition according to the documentation. I am doing something wrong? kafka version 2.1.1.

apache spark – Kafka createDirectStream using PySpark

My main goal is to connect Kafka, create a DStream, save it in the local variable as a row and write it in mongo db, and make the flow from end to end in PySpark.

But I am facing a problem in the first step, when creating DStream and the error is "java.util.ArrayList cannot be transmitted to java.lang.String". Can you help me identify the solution?
the details are below,

I am trying to connect kafka using pyspark as shown below,

kafkaParams = {"metadata.broker.list": ('host1:port','host2:port','host3:port'),
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}

directKafkaStream = KafkaUtils.createDirectStream(ssc,("lac.mx.digitalchannel.raw.s015-txn-qrdc"),kafkaParams)

but I get an error, I'm not sure how to handle it,

py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
        at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
        at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Also, to open PySpark CLI that I am using,

pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar  --files file.jks,file2.jks

kafka migration error when trying to update data flow

I inherited an inherited kafka system, version 0.8.2. Now I am trying to update the system to something more modern. I know that I cannot go directly from 0.8.2 to something beyond 1.0, but I expected to migrate the system incrementally.

The problem is that when I try to do something simple like going from 0.8.2 to 0.9.0, an error appears.

java.lang.IllegalArgumentException: error in the requirement: log.message.format.version 0.9.0 cannot be used when inter.broker.protocol.version is set to 0.8.2

I hope someone can tell me what I configured wrong or missing in my server configuration.

My server.properties file in a version 0.10.0 contains:



############################# Server Basics #############################


broker.id=0

auto.create.topics.enable=true
inter.broker.protocol.version=0.8.2
log.message.format.version=0.9.0

############################# Socket Server Settings #############################

#advertised.listeners=PLAINTEXT://your.host.name:9092


num.network.threads=3


num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


############################# Log Basics #############################


num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

log.retention.hours=168


log segment will be created.
log.segment.bytes=1073741824

deleted according
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Thanks for any help

Using Storm with Kafka in Docker

I want to run Kafka and Storm (connected through Kafka Spout) in Docker and implement using docker-compose. My docker-compose.yml looks like this:

    version: '3.3'

    services:

      zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        container_name: zookeeper
        hostname: zookeeper
        ports:
          - "2181:2181"
        environment:
          - ZOOKEEPER_CLIENT_PORT=2181
        restart: unless-stopped
      kafka:
        image: confluentinc/cp-kafka:5.1.2
        container_name: kafka
        hostname: kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_LISTENERS: PLAINTEXT://kafka:19092,LISTENER_DOCKER_EXTERNAL://kafka:9092
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-kafka}:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_NUM_PARTITIONS: 1
          KAFKA_DEFAULT_REPLICATION_FACTOR: 1
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
        depends_on:
          - zookeeper
        restart: unless-stopped
      kafka-producer:
        build:
          context: kafka-dummy-producer
          dockerfile: Dockerfile
        container_name: producer
        depends_on:
          - kafka
      kafka-consumer:
        build:
          context: kafka-dummy-consumer
          dockerfile: Dockerfile
        container_name: consumer
        depends_on:
          - kafka
      nimbus:
        image: storm
        container_name: nimbus
        command: storm nimbus
        depends_on:
          - zookeeper
        links:
          - zookeeper
        restart: unless-stopped
        ports:
          - "6627:6627"
      supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        restart: unless-stopped
      storm-ui:
        image: storm
        container_name: storm-ui
        command: storm ui
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        ports:
          - "8080:8080"
        restart: unless-stopped

My problem is with Storm, so all the guides on how to run it in Docker seem to be quite old, because they suggest using links, which are now deprecated in Docker. However, I used it simply to make at least something work, so I set up Kafka and Storm with a shared zookeeper, and followed the official guidelines on both. For Kafka, I used two simple clients and they work, so Kafka seems to be working fine.

Storm doesn't push the log to stdout, so I can't really verify it, and the use of volumes for the log files doesn't seem to work either. When trying to access the Storm user interface on localhost: 8080 it worked only once, but after this never again.

Does my approach go in the right direction? What can I do to fix the configuration?

go – Is there an open source Kafka project that shows how producer data is sent to the consumer and some logic on the consumer side?

Battery exchange network

The Stack Exchange network consists of 175 question and answer communities, including Stack Overflow, the largest and most reliable online community for developers to learn, share their knowledge and develop their careers.

Visit Stack Exchange

Kafka mirror for aggregation of records at AWS ELK facilities

I need to pump all my records to AWS ELK from local servers (via direct connection). For that I will use Kafka to store messages. We are planning to apply the QoS rules and ensure that these log messages consume a given percentage of the network bandwidth (to avoid affecting the other important messages; it is okay to have some delay in the synchronization of the log between the facilities and AWS ), therefore, I am thinking of having a Kafka mirror between the facilities and AWS to allow log messages to synchronize slowly with the given bandwidth

My question is whether it is a good idea to use a Kafka mirror for this. Is that an exaggeration?

Appreciate any comments about the overall architecture too

Thank you

apache kafka: efficient options for making flow junctions (with large time intervals)

Use case: I need to connect two transmission sources (say orders(order_id, order_val) Y shipments(shipment_id, shipment_val)) based on an identification (order_id = shipment_id) and generate a new event shipment_order(id, order_val, shipment_val)

Keep in mind that the gap between events can be huge (1 year – 2 years)
Former: order(order_id = 1, value=1) could arrive today but the shipment(shipment_id=1, value=2) It could arrive after a year.

I am exploring patterns to achieve efficient long-term window flow joints:

  1. Store events in DynamoDB (or any other data store) and reissue events through DynamoDB (Change data capture in general) flows, when an order DDB event or a sending DDB event arrives, I check DynamoDB for both events using the DDB event identification and make a union and issue a new shipping order

  2. The use of Kafka Stream joins with large windows (2 years)

Questions:

  1. What are some good patterns to achieve such large window joints (Am I on the right track when considering the above options?)

  2. Is it recommended to make a 2-year window joint on systems with status like Kafka? (What are the implications?)

Note: Assume that the system processes around 200 million events per day with peaks. By Efficiency, I mean the general efficiency of time / cost (and compensation).

Hortonworks data platform: how to get the right Kafka version when installing it from HDF 3.4 over HDP 3.1

I am building a Hortonworks cluster with HDP and HDF to install. First I installed HDP and then installed / integrated with HDF on top.

Environment Details:
Operating system: Red Hat Enterprise Linux Server version 7.6 (Maipo)
Version: Ambari -2.7.3, HDP – 3.1, HDF -3.4.0

Basically, HDP-3.1 has kafka 1.0.1 in the package and in HDF it has kafka 2.1.0 packages available and I need the HDF version of Kafka to be available. Although I installed Kafka from HDF, Ambari shows the kafka version of 1.0.1. After integration with HDF, Kafka-2.1.0 is not displayed in the Add service list.

I need to know, how can I install Kafka 2.1.0 in the cluster?

Also, is it possible that the version shown is 1.0.1, although Kafka 2.1.0 is installed?

Microservices – Use of Apache Kafka for rare events

There are multiple microservices that are responsible for storing user dependent data, e.g. User preferences on some products, user orders, etc. There is a scenario in which the user is deleted from the database of the user administration service and the requirement is also to delete the data of that user in the databases of other services. The use of a message broker with editor (user administration service) and subscribers (several microservices interested in such an event) seems to be the most intuitive solution for me. I thought of Apache Kafka, but as such events will not happen very often, I have doubts if this "fast and evil" tool, known for its great performance, is adequate. Could someone comment if it is a good idea? Or maybe I should use another message agent or even another architectural solution?