Single Producer/Multiple Consumers in Node

I’ve done my best to contrive an example of the problem I’m trying to simplify. I have a pool of workers, and each worker can do several tasks at a time. Each task takes an unspecified amount of time. The goal is to drain the task queue by keeping each each worker as busy as possible.

In order to accomplish this, once the taskQueue has a nonzero number of tasks, each Worker is given tasks up to maxTasks. When a worker completes any one task, the runLoop is called, which checks all the workers and reloads any workers that have some availability until the taskQueue is empty..

This doesn’t feel like an elegant solution. I shouldn’t have to check each worker for availability each time any worker completes a task (which means I can probably abstract away the runLoop). WorkerPool shouldn’t be pushing tasks to workers, workers should be pulling tasks when they complete one. Would this be better accomplished using a stream or an event emitter? Any thoughts would be greatly appreciated.

function WorkerPool(numWorkers) {
  const workers = ();
  const taskQueue = ();

  for (let i = 0; i < numWorkers; i += 1) {
    workers.push(Worker());
  }

  function runLoop() {
    for (const worker of workers) {
      const count = worker.availableTasks();
      for (let i = 0; i < count && taskQueue.length; i++) {
        const task = taskQueue.pop();
        console.log(`task queue length decremented to ${taskQueue.length}`);

        worker.work(task, runLoop);
      }
    }
  }

  return {
    work: function(task) {
      taskQueue.push(task);
      console.log(`task queue length incremented to ${taskQueue.length}`);
      runLoop();
    }
  }
}

function Worker() {
  let activeTasks = 0;
  let maxTasks = 5;

  return {
    availableTasks: () => maxTasks - activeTasks,
    work: function(task, callback) {
      activeTasks += 1;
      console.log(`workers active tasks incremented to ${activeTasks}`);

      // do some work that takes an unspecified 
      // amount of time, then callback with the 
      // result.
      setTimeout(() => {
        activeTasks -= 1;
        console.log(`workers active tasks decremented to ${activeTasks}`);
        callback();
      }, Math.floor(Math.random() * 100));
    },
  };
}

// test code
const pool = WorkerPool(3);
for (let i = 0; i < 6000; i++) {
  pool.work({ /* some work to be done */ });
}
```

rabbitmq – Why control minimum and maximum simultaneous consumers in an AMQP queue?

I am trying to understand why some frameworks introduce control over the number of consumers in an AMQP queue.

For example, Spring AMQP introduces this function in version 1.3.0, with the maxConcurrentConsumers property:

Since version 1.3.0, you can now dynamically adjust the concurrentConsumers property. If changed while the container is running, consumers are added or removed as necessary to adjust to the new configuration.

Also, a new property called maxConcurrentConsumers has been added and the container dynamically adjusts concurrency based on workload.

But I can't understand the advantages of using it if I could only configure the concurrentConsumers Already with my maximum value.

The only advantage I know of is that it reduces the average number of connections to the RabbitMQ server. But thinking that most apps are a long way from reaching a series of connections that could create problems for an instance of RabbitMQ, it seems very rare to take advantage of this consumer monitoring feature.

rabbitmq – Why control concurrent minimum and maximum consumers in an AMQP queue?

I am trying to understand why some frameworks introduce control over the number of consumers in an AMQP queue.

For example, Spring AMQP introduces this function in version 1.3.0, with the maxConcurrentConsumers property:

Since version 1.3.0, you can now dynamically adjust the concurrentConsumers property. If changed while the container is running, consumers are added or removed as necessary to adjust to the new configuration.

Also, a new property called maxConcurrentConsumers has been added and the container dynamically adjusts concurrency based on workload.

But I can't understand the advantages of using it if I could only configure the concurrentConsumers Already with my maximum value.

The only advantage I know of is that it reduces the average number of connections to the RabbitMQ server. But thinking that most applications are a long way from reaching a number of connections that could create problems for an instance of RabbitMQ, the control of these consumers seems very rare to take advantage of.

[ Politics ] Open question: a wealth tax is ridiculous. Most people who own things over 50 million will simply transfer the tax to consumers, for example, a hotel building.

A hotel building that costs 400 million will pay a 3 percent tax for 350 million of those 400 million.
The building owner will only charge a higher rate, in the end the working class will pay that tax

Not all kafka consumers are being assigned to partitions

I have 10 consumers and 10 partitions.
I take the number of partitions

    int partitionCount = getPartitionCount(kafkaUrl);

and I create the same number of consumers with the same Group ID.

Stream.iterate(0, i -> i + 1)
.limit(partitionCount)
.forEach(index -> executorService.execute(() -> createCosnumer(consumerConfig(index), topicName)

config looks like this

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

each consumer assigns a topic and begins to consume

consumer.subscribe(topicName);
while (true) {
ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(4));
if (consumerRecords.count() > 0) {
consumeRecords(consumerRecords);
consumer.commitSync();
}
}

what I see after assigning consumers to the partition

TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CLIENT-ID                                                        
topicName  1          89391           89391           0               consumer0
topicName  3          88777           88777           0               consumer1
topicName  5          89280           89280           0               consumer2
topicName  4          88776           88776           0               consumer2
topicName  0          4670991         4670991         0               consumer0
topicName  9          23307           89343           66036           consumer4
topicName  7          89610           89610           0               consumer3
topicName  8          88167           88167           0               consumer4
topicName  2          89138           89138           0               consumer1
topicName  6          88967           88967           0               consumer3

only half of consumers have been assigned to partitions
Why did this happened? There must be one consumer per partition according to the documentation. I am doing something wrong? kafka version 2.1.1.

I will give you 175,000 consumers of US email lists. UU. For $ 30

I will give you 175,000 consumers of US email lists. UU.

NO DUPLICATED EMAIL

—————- I will send you data emails in a text file ———————

Email lists; US consumers UU.

1. Are you looking to gather US consumers. UU.?

2. Are you looking for US consumers. UU. For your products and services?

3. Do you want to sell goods and services to US residences. UU.?

4. Do you want to send your message / promotional offers / events, etc. to the US residential UU.?

5. Do you have a local business in the US? UU.?

If one of the above questions matches your search criteria, this email list is very beneficial for you.

Here, you will receive 175K HOTMAIL email lists from US consumers. UU.

.

Why does Trump continue to lower tariffs? Why did they tell you that they will make it difficult for American consumers?

Trump said he is doing it for the holiday season and could affect Americans who will pay the burden of tariffs.

But he shouldn't worry because he also said that China is paying for it.

Tbh, I'm not sure what's real anymore, what are lies.

.

Cloud: Would a database hosting service benefit by compressing the data of its consumers?

I'm not sure if this is the stack to ask the question but here it goes. Assuming that I am associated with a database hosting service that charges me for a criterion: how much space does my information occupy on your server. Would it be beneficial for the service to compress my data and why? Since data compression uses CPU, it requires a lot of time, etc. Would it be theoretically more expensive to pay for the additional data that my uncompressed data occupies?

Thank you 🙂

microservices – MQ Integration: How to notify consumers about upcoming message format changes?

We have multiple microservices that communicate through MQ. Since the MQ messages are the interface / contract between the services, whenever we make changes to the MQ message published by a service, we must make the same adjustments in the services that consume the message.

From now on, the amount of services is small enough so that we know which services communicate with each other and we can keep the MQ message contract synchronized between them. But as the number of services grows, this becomes more difficult.

Option 1: Break things first, then fix it

I have been thinking maybe implementing some type of health check. Let's say that service A during operations can issue message type X, which is consumed by service B. Service A then, at startup, can issue a status check message type, something in the lines of a message X dry execution. When Service B receives this, it simply verifies that the message is in accordance with the contract. If not, for example, if service A deleted a critical field in the message, then service B will reject the message, which in turn will end up in an exchange of messages not delivered, which will activate a warning notification for the staff of devops.

This approach will not prevent us from implementing unsupported message types, but it will notify us almost instantaneously when we do. For our use case, this could work due to our very small number of developers and projects, so if we break this kind of thing we could fix it quite quickly.

Option 2: first probes

A variation on this could be that we started to version the MQ message format (which we probably should and will do anyway). Then, when service A plans to upgrade from version 1 of message type X to version 2, server A could start issuing version type 2 of the "dry" message type. This would cause service B to delete the message. Suppose this happens a few days or weeks before Service A makes the actual change from version 1 to version 2, then the devops team will have time to add support for version 2 in the meantime.

Option 3: Detect conflicts manually before deployment

Another approach would be to have some way of detecting, before actual deployment, that service A is about to start issuing unsupported messages in the first place. This would mean that it would be necessary to maintain some matrix or something about which versions of message X is compatible with which consumer, and to postpone the deployment of service A (with the new version of message X) until all consumers are ready for it. How to implement this effectively I do not know.

Other alternatives

How do other types of messages handle the compatibility between the services that are communicated using MQ? How do you know that when your service A makes a change in message type X, it will not interrupt any of the consumers?

P.S. I posted it on Reddit a few days ago, but due to the lack of comments, I decided to post here as well.