microservices – Coping with lack of infinite retention event broker in event driven architecture

When building event driven systems, events might be used for two distinct aspects of the system:

  • for communication between components, e.g. using a message queue
  • for representing the state of a service

These concerns are related, sometimes intertwined, but often technically distinct.

Infinite retention of an event stream would typically mean that you store the events in a database, allowing you to replay them and to rebuild the current state at any later point in time. This property of an event-based system has nothing to do with how your services communicate – it works just as well regardless of whether you use message queues or REST as a communication pattern.

Certain message queue systems do have the ability to store messages/events indefinitely. Notably, Kafka has this capability (and cloud providers will happily sell you this extended retention). But it’s not necessarily wise to use such capabilities.

Communication in a distributed system must be assumed to be unreliable. If your system only works correctly if messages are delivered exactly-once, you’re going to have problems.

E.g. assume that you have a horizontally-scalable service where every instance has its own cursor for consuming from the message queue. If one instance is temporarily turned off for half a year and then restarts, it will likely spend multiple days just catching up with old messages. The ability to rebuild the entire state can be very important, but might be a poor practice when used as a standard operating procedure – even in an event-driven system, you likely want checkpoints of the current state that can be shared between instances.

Dealing with limited-retention message queues can be an “enabling constraint”: forcing you to think about what happens when messages are dropped, and about how state can be rebuilt efficiently.

In a simple message-oriented service architecture, we might have two services A and B that are connected via some queue Q. When A sends a message to B, what does this mean, what can A assume about B? A should assume absolutely nothing. A should not assume that B will receive and process the message within any particular deadline. If the message is important, A might try re-sending the message with exponential backoff until B sends another message in response (or use a message queue that guarantees delivery attempts until B acknowledges it). If B wants to be able to re-build its own state in an event-driven manner, it might store events in its own database – but would probably not rely on the message queue Q for persistent event storage.

In practice, it is possible to make message queues (and any distributed system, really) highly reliable so that guaranteed delivery can be ensured almost always. But whether that’s the case depends on the Ops team that maintains this architecture, and on the specific guarantees provided by your specific technology choices.

design – Implementing Event Sourcing persistence over streaming broker

I implement an event sourcing system over a streaming broker (Apache Pulsar), and I am concerned regarding the transactionnality of the persistence mechanism.

Currently my event system works as follows: I have Commands which, given an Aggregate‘s events and some parameters produce new Events.

To achieve persistence, I should have a system which is able to retrieve all the Events from an Aggregate (or a given key), and store atomically a set of Events (preventing concurrent stores).

In order to do that, I had the following plan:

  1. Create a consumer over the topic/key of the concerned Aggregate
  2. Open a transaction (Apache Pulsar allows it, to atomically publish a set of messages at once)
  3. Load previous messages/Events (thanks to PrestoSQL/Trinio)
  4. Forward the fetched Events to the Command
  5. Push the new Events in the broker
  6. Check whether the consumer (1) received something, if so, abort the transaction
  7. Ack the messages/Events
  8. Commit the transaction

In my opinion it is quite complicated, and concurrency can happen between (6) and (8).
Is it the right way to implement an event sourcing persistence over a streaming broker? or is it flawed by design/essence?

❕NEWS – Popular Retail Broker Charles Schwab Considering Crypto | Proxies123.com

In my country, some online retail not use crypto but they seem to manage with some token developer like stormx to get some crypto cashback when we shop at that store. Right now, i always shop in that store and get my crypto cashback. Hope many store will implement crypto, direct or indirect..

❓ASK – Best broker you have ever used | Proxies123.com

Earnings Disclaimer:  All the posts published herein are merely based on individual views, and they do not expressly or by implications represent those of Proxies123.com or its owner. It is hereby made clear that Proxies123.com does not endorse, support, adopt or vouch any views, programs and/or business opportunities posted herein. Proxies123.com also does not give and/or offer any investment advice to any members and/or it’s readers. All members and readers are advised to independently consult their own consultants, lawyers and/or families before making any investment and/or business decisions. This forum is merely a place for general discussions. It is hereby agreed by all members and/or readers that Proxies123.com is in no way responsible and/or liable for any damages and/or losses suffered by anyone of you.

c# – “streaming” messages to a broker from a batch-like source (eg sql database), optionally acknowledge back to the source

I am writing library code to help other developers send messages to a pub-sub event broker.

In many cases, the message source will not be “stream-like”. Specifically, most messages are coming from SQL Server change data capture, and are therefore much more efficiently read in sets (batches).

Additionally, in most cases the message should be “guaranteed” – ie, if we fail to send the message to the broker then we should send it again, even if the sending application crashes for some reason. This implies letting the source know that a message has been sent. As was the case with reading from the source, acknowledging back to it is also set based, for the same reason.

It is acceptable that a message may be “accidentally” sent multiple times in exception-like circumstances, (guaranteed-once-in-order delivery is much too hard a problem for me to solve in my little implementation here!), but this should not occur if everything is running smoothly.

Batch sizes might be large, so I don’t really want to concurrently execute n publish-to-broker tasks, where n is the number of messages in the batch, because that could potentially be a heck of a lot of Task objects being waited on by something like Task.WhenAll. Therefore I have used DataFlow as a concurrency-limiting mechanism.

Here are the Interfaces I have created (I am fairly happy with these so skip over them if you don’t need the context, although I do wonder if the AcknowledgeAsync method should accept an IAsyncEnumerable for symmetry). The IGuaranteedBatchProvider does not extend IBatchProvider, because acknowledgement only requires an IMessage, not an IMessage<T>.


public interface IMessage
{
    /// <summary>
    /// Allows a message to be tracked locally through processing pipeline, either on the
    /// publication or subscription side, but the values sent by a publisher will not be the same
    /// as those received by a subscriber, since the LocalMessageId values for messages received
    /// by susbcribers are generated by the broker
    /// </summary>
    public long LocalMessageId { get; init; }
}

public interface IMessage<T> : IMessage
{
    public T Payload { get; init; }
}

public interface IBatchProvider<T>
{
    /// <summary>
    /// Returns a batch of unacknowledged messages from the source. If the provider is also a
    /// guaranteed provider, then all messages in a batch should be acknowledged prior to reading
    /// a new batch, otherwise the new batch will contain duplicate messages
    /// </summary>
    IAsyncEnumerable<IMessage<T>> ReadBatchAsync(ushort maxBatchSize, CancellationToken ct);
}
 
public interface IGuaranteedBatchProvider
{
    /// <summary>
    /// informs a guaranteed provider that a set of messages have been successfully transmitted.
    /// Acknowledged messages will not be returned by subsequent read operations
    /// </summary>
    Task AcknowledgeAsync(IEnumerable<IMessage> messagesToAcknowledge, CancellationToken ct);
}

public interface IPublication<T>
{
    /// <summary>
    /// sends a message to the broker, returns true if the payload is successfully published
    /// </summary>
    Task<bool> PublishAsync(T payload);
}

The code I am interested in having reviewed is in my BatchPublisher class. This helper class understands how to work with the interfaces described above. The Run loop is what follows. I would particularly appreciate feedback for the sections of code commented as //ugly?, whether a Channel implementation might be more appropriate (I have not yet used channels), and of course any overall comments about the overall design. Most exception handling removed for brevity.

The provider here would typically be a CdcProvider<T> : IBatchProvider<T>, IGuaranteedBatchProvider

public async Task Run(CancellationToken ct)
{
    var results = new ConcurrentBag<(IMessage msg, bool result)>();

    // ugly?
    Func<IMessage<T>, Task> publish = provider is IGuaranteedBatchProvider
        ? async (msg) => results.Add((msg, await publication.PublishAsync(msg.Payload)))
        : async (msg) => _ = await publication.PublishAsync(msg.Payload);

    while (!ct.IsCancellationRequested)
    {
        results.Clear();

        // channels instead? Newing up the actionblock every batch seems wrong somehow
        ActionBlock<IMessage<T>> limiter = new
        (
            async (msg) =>
            {
                try { await publish(msg); }
               // if the consuming application as registered an event handler for publication
               // exceptions, call it (mostly for error logging)
                catch (Exception x) { PublishFailed?.Invoke(this, (msg, x)); }
            },
            new ExecutionDataflowBlockOptions
            {
                CancellationToken = ct,
                EnsureOrdered = true,
                MaxDegreeOfParallelism = sendConcurrency
            }
        );

        await foreach (var message in provider.ReadBatchAsync(maxReadBatchSize, ct))
        {
            limiter.Post(message);
        }
        limiter.Complete();
        await limiter.Completion;
        // ugly construct with await and null coalescing?
        await ((provider as IGuaranteedBatchProvider)?.AcknowledgeAsync(results.Select(r => r.msg), ct) ?? Task.CompletedTask);
    }
}

Service Broker limit in SQL Server

As mentioned in the Link below,

For client-side applications, query notification users should not exceed ten concurrent users , in SQL Server 2005.

What is the maximum number of concurrent users allowed while using query notification in SQL 2008 and above?

2016 – Add Host to Service Bus Farm’ Failed & Service Bus Message Broker Service stuck on Starting mode

when I install & Configure WF Farm in one of SP 2016 APP Server I am facing issue, I created new Farm and WF Manager Configuration wizard fails at ‘Add Host to Service Bus Farm’
I enabled TlS 1.0,1.1,1.2 already before starting the wizard.
and Service Bus Message Broker Service in Starting state.

enter image description here

In Event Viewer below errors:

Faulting application name: Microsoft.ServiceBus.MessageBroker.exe, version: 2.0.50926.9, time stamp: 0x5a0a1de1
Faulting module name: unknown, version: 0.0.0.0, time stamp: 0x00000000
Exception code: 0x80131623
Fault offset: 0x00007ff955393102
Faulting process id: 0x2074
Faulting application start time: 0x01d7815c0d76f752
Faulting application path: C:Program FilesService Bus1.1Microsoft.ServiceBus.MessageBroker.exe
Faulting module path: unknown
Report Id: 16a5f355-4152-4727-8e9a-29f4498210e6
Faulting package full name:
Faulting package-relative application ID:

Application: Microsoft.ServiceBus.MessageBroker.exe
Framework Version: v4.0.30319
Description: The application requested process termination through System.Environment.FailFast(string message).
Message: Failed to start Service Bus Broker Service.
Stack:
at System.Environment.FailFast(System.String, System.Exception)
at Microsoft.ServiceBus.MessageBroker.Backend.OnStart(System.String())
at System.ServiceProcess.ServiceBase.ServiceQueuedMainCallback(System.Object)
at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()

design patterns – Do I need a Message processing service before RabbitMQ (or any other message broker)

I hope that’s a proper place to ask my question.

I am wondering how is best to integrate RabbitMq (or any other message broker) into my project.

There are 2 options:

  1. Simply collect all messages, whatever comes from my other services and dump all into Rabbit

Option1

  1. Collect messages into intermediate service, to ensure “contract”, which will then forward them to rabbit in some particular form

Option2

While first option seems to be native, it may (?) create problems, if we try later to switch from Rabbit to smth else, like Kafka (or not?), the second seems to defeat Rabbits purpose.

I actually want to know, if the second option has a right to exist at all and why? Is it standart to have some extra service before message broker or not (and what could be the reason for it)?

amazon web services – degraded iops and throughput on a linux machine (that hosts kafka broker)

we have a kafka cluster on AWS with 8 broker machines.

OS version (taken from /proc/version) is:

Linux version 5.4.0-1029-aws (buildd@lcy01-amd64-021) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)) #30~18.04.1-Ubuntu SMP Tue Oct 20 11:09:25 UTC 2020

broker id 5 was added recently, and the problem that we see is that during times of high disk util% due to a burst of writes into the brokers, the disk mounted to kafka (/dev/nvme1n1p1 is mounted on /var/lib/kafka/data)
shows a degraded performance in terms of w/sec and wMB/sec, which are much lower on that broker compared to the other 7 brokers (~40% less iops and throughput on that broker).

the data in this table was taken from running iostat -x on all the brokers, starting at the same time and ending after 3 hours during peak time. the cluster handles ~2M messages/sec.

another strange behavior is that broker id 7 has ~40% more iops and throughput during bursts of writes compared to the other brokers.

we compared kafka conf files on all brokers and they’re the same.

any idea as to what can cause such degraded performance in borker id 5 (or such a good performance on broker id 7)?

this issue is causing the consumers from this cluster to lag during high writes because broker id 5 gets into high iowait, and in case some consumer reads gets into lag and performs reads from the disk then the iowait on broker id 5 climbs into ~70% and all consumers start to lag and also the producers get OOM due buffered messages that the broker doesn’t accept.

enter image description here

Should you notify the broker if they sent you more coin than you paid for?

in other words what if I didn’t tell them?