linux – kafka + using the RAM disk instead of the physical disk

we are using kafka cluster, when each kafka runner has a physical disk

Is it possible that Kafka Broker uses a RAM disk instead of a physical disk? ,

What is a memory-based file system (RAM disk)?

A memory-based file system is something that creates a storage area directly in the RAM of a computer like a partition on a disk drive. Since RAM is a type of volatile memory, which means that when the system reboots or fails, the file system is lost along with all of its data.
The main benefit of memory based file systems is that they are very fast, 10 times faster than modern SSDs. Read and write performance is greatly increased for all types of workload. These types of fast storage areas are ideal for applications that need repetitively small data areas to cache or use as temporary space. Because the data is lost when the machine is restarted, the data should not be valuable, as even backup scheduling cannot guarantee that all data will be replicated even in the event of a system crash.

message queue: can front-end applications (mobile, web, etc.) write directly to MQ (Kafka or RabbitMQ)? Or do you need some adapters / proxies / gateways?

Found this:

At Uber, we use Apache Kafka as a message bus to connect different
parts of the ecosystem. We also collect system and application logs.
how pilot and driver app event data. So we do this
Data available to a variety of downstream consumers through Kafka.

enter the image description here

Can anyone describe how this part:

enter the image description here

job? Can front-end applications (mobile, React, Angular, etc.) write directly to Kafka (or RabbitMQ)? Or do you need some adapters / proxies / gateways?

Design Patterns: In Kafka Streaming, is `StreamsBuilder` a constructor class?

In general, a constructor instance is used as follows:

  builder.buildPartA();
  builder.buildPartB();
  co = builder.getResult();

In Kafka Streaming, it isStreamsBuilder a builder class? For example, at https://kafka.apache.org/24/documentation/streams/ (see also below),

  • it is builder.stream() an equivalence to builder.buildPartA()?
  • Is there no equivalence for builder.buildPartB()?
  • What to do textLines.flatMapValues(), wordCounts.toStream() corresponds to a builder pattern?

Thank you.

StreamsBuilder builder = new StreamsBuilder();
KStream textLines = builder.stream("TextLinesTopic");
KTable wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
    .groupBy((key, word) -> word)
    .count(Materialized.>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);

Is it possible to configure Kafka Consumer Group in different data centers (in different locations of gio)

In my current configuration, I have a group of producers that send the data in Round Robin form to 3 Kafka groups and each group has 3 Kafka agents and each one is located in 3 different availability zones (Gio location)
Consider that each cluster obtains a different set of data. Now each cluster has a set of consumers. Now my question is, is it possible to gather all consumers into a single group of consumers? My question is that, since each group has a different set of data as input, it is not possible to gather all consumers under a group of consumers.
I have attached a enter the description of the image hereflow chart here

domain-driven design: what is the correct way to post messages in saga using kafka?

I am designing a library using the cqrs pattern. I treat the saga as an aggregate using the method mentioned at https://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-i-of-ii/.

Here I have a question about how to implement the delivery of messages subscribed by a saga.

Let's say I have two aggregates, Account and MoneyTransferSaga in the application.
The TransferMoney command would trigger the MoneySent event that reduces the amount of the account balance.
The MoneySent event would trigger the creation of MoneyTransferSaga that sends the ReceiveMoney command to another account that would increase the amount of the account balance.

Then, we can see that MoneyTransferSaga is subscribing to the MoneySent event. The question I would like to ask is, what is the correct way to send this MoneySent event to the saga?

I am using kafka as a message broker and here I have account command themes and account events for account addition.

Approach A: Mark MoneyTransferSaga as another group of consumers who subscribe to the "account events" topic. When the next event matches the "MoneySent" type, activate the saga event handler. The event handler in the saga will ignore other events.

Approach B: Create a theme called "money transfer saga events". When the event is sent to account events, the account event handler duplicates the event to the "money transfer-saga events" topic. MoneyTransferSaga subscribes the theme "money-transfer-saga-events" instead of "account-events".

Not sure which approach is better.

Using approach A, the saga instance would receive all the events from the account.

Using approach B, the account service must know about the existence of MoneyTransferSaga.

Thank you.

apache kafka – confluent scheme log coupler

I want to use the scheme log coupler (image owned by confluent) with my open source Kafka that I installed locally on my PC.

I am using the following command to execute the image:

docker run -p 8081:8081  
    -e  SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://127.0.0.1:9092 
    -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 
    -e SCHEMA_REGISTRY_DEBUG=true confluentinc/cp-schema-registry:latest

but I receive the following connection errors:

(kafka-admin-client-thread | adminclient-1) WARN org.apache.kafka.clients.NetworkClient - (AdminClient clientId=adminclient-1) Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
(kafka-admin-client-thread | adminclient-1) WARN org.apache.kafka.clients.NetworkClient - (AdminClient clientId=adminclient-1) Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
(main) ERROR io.confluent.admin.utils.ClusterStatus - Error while getting broker list.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:149)
    at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
(main) INFO io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...

I have Kafka installed on my localhost.
Any ideas to solve this please?

Not all kafka consumers are being assigned to partitions

I have 10 consumers and 10 partitions.
I take the number of partitions

    int partitionCount = getPartitionCount(kafkaUrl);

and I create the same number of consumers with the same Group ID.

Stream.iterate(0, i -> i + 1)
.limit(partitionCount)
.forEach(index -> executorService.execute(() -> createCosnumer(consumerConfig(index), topicName)

config looks like this

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

each consumer assigns a topic and begins to consume

consumer.subscribe(topicName);
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(4));
if (consumerRecords.count() > 0) {
consumeRecords(consumerRecords);
consumer.commitSync();
}
}

what I see after assigning consumers to the partition

TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CLIENT-ID                                                        
topicName  1          89391           89391           0               consumer0
topicName  3          88777           88777           0               consumer1
topicName  5          89280           89280           0               consumer2
topicName  4          88776           88776           0               consumer2
topicName  0          4670991         4670991         0               consumer0
topicName  9          23307           89343           66036           consumer4
topicName  7          89610           89610           0               consumer3
topicName  8          88167           88167           0               consumer4
topicName  2          89138           89138           0               consumer1
topicName  6          88967           88967           0               consumer3

only half of consumers have been assigned to partitions
Why did this happened? There must be one consumer per partition according to the documentation. I am doing something wrong? kafka version 2.1.1.

apache spark – Kafka createDirectStream using PySpark

My main goal is to connect Kafka, create a DStream, save it in the local variable as a row and write it in mongo db, and make the flow from end to end in PySpark.

But I am facing a problem in the first step, when creating DStream and the error is "java.util.ArrayList cannot be transmitted to java.lang.String". Can you help me identify the solution?
the details are below,

I am trying to connect kafka using pyspark as shown below,

kafkaParams = {"metadata.broker.list": ('host1:port','host2:port','host3:port'),
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}

directKafkaStream = KafkaUtils.createDirectStream(ssc,("lac.mx.digitalchannel.raw.s015-txn-qrdc"),kafkaParams)

but I get an error, I'm not sure how to handle it,

py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
        at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
        at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Also, to open PySpark CLI that I am using,

pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar  --files file.jks,file2.jks

kafka migration error when trying to update data flow

I inherited an inherited kafka system, version 0.8.2. Now I am trying to update the system to something more modern. I know that I cannot go directly from 0.8.2 to something beyond 1.0, but I expected to migrate the system incrementally.

The problem is that when I try to do something simple like going from 0.8.2 to 0.9.0, an error appears.

java.lang.IllegalArgumentException: error in the requirement: log.message.format.version 0.9.0 cannot be used when inter.broker.protocol.version is set to 0.8.2

I hope someone can tell me what I configured wrong or missing in my server configuration.

My server.properties file in a version 0.10.0 contains:



############################# Server Basics #############################


broker.id=0

auto.create.topics.enable=true
inter.broker.protocol.version=0.8.2
log.message.format.version=0.9.0

############################# Socket Server Settings #############################

#advertised.listeners=PLAINTEXT://your.host.name:9092


num.network.threads=3


num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


############################# Log Basics #############################


num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

log.retention.hours=168


log segment will be created.
log.segment.bytes=1073741824

deleted according
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Thanks for any help

Using Storm with Kafka in Docker

I want to run Kafka and Storm (connected through Kafka Spout) in Docker and implement using docker-compose. My docker-compose.yml looks like this:

    version: '3.3'

    services:

      zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        container_name: zookeeper
        hostname: zookeeper
        ports:
          - "2181:2181"
        environment:
          - ZOOKEEPER_CLIENT_PORT=2181
        restart: unless-stopped
      kafka:
        image: confluentinc/cp-kafka:5.1.2
        container_name: kafka
        hostname: kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_LISTENERS: PLAINTEXT://kafka:19092,LISTENER_DOCKER_EXTERNAL://kafka:9092
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-kafka}:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_NUM_PARTITIONS: 1
          KAFKA_DEFAULT_REPLICATION_FACTOR: 1
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
        depends_on:
          - zookeeper
        restart: unless-stopped
      kafka-producer:
        build:
          context: kafka-dummy-producer
          dockerfile: Dockerfile
        container_name: producer
        depends_on:
          - kafka
      kafka-consumer:
        build:
          context: kafka-dummy-consumer
          dockerfile: Dockerfile
        container_name: consumer
        depends_on:
          - kafka
      nimbus:
        image: storm
        container_name: nimbus
        command: storm nimbus
        depends_on:
          - zookeeper
        links:
          - zookeeper
        restart: unless-stopped
        ports:
          - "6627:6627"
      supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        restart: unless-stopped
      storm-ui:
        image: storm
        container_name: storm-ui
        command: storm ui
        depends_on:
          - nimbus
          - zookeeper
        links:
          - nimbus
          - zookeeper
        ports:
          - "8080:8080"
        restart: unless-stopped

My problem is with Storm, so all the guides on how to run it in Docker seem to be quite old, because they suggest using links, which are now deprecated in Docker. However, I used it simply to make at least something work, so I set up Kafka and Storm with a shared zookeeper, and followed the official guidelines on both. For Kafka, I used two simple clients and they work, so Kafka seems to be working fine.

Storm doesn't push the log to stdout, so I can't really verify it, and the use of volumes for the log files doesn't seem to work either. When trying to access the Storm user interface on localhost: 8080 it worked only once, but after this never again.

Does my approach go in the right direction? What can I do to fix the configuration?