MPMC Message Queue using only atomics

| ⌛ 5 minutes read

📋 Tags: C/C++


The Code

In memory limited environments, using typical C++ synchronization primitives such as counting_semaphores may not be possible. Or perhaps you are using C. So, a pure atomic implementation may be needed.

This is my crack at making a buffered MPMC MQ.

Done in 1 night as a POC to a peer. So, this is likely problematic somewhere. But for the sake of documentation, here it is.

 0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <stdatomic.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>

typedef struct {
    uint8_t id;
    int data;           // change type as needed
} payload_t;

typedef struct {
    uint8_t len;
    uint8_t read;
    uint8_t write;
    atomic_flag mtx;    // use atomics as a mutex
    payload_t* buf;     // circular buffer
} MQ_t;

// Prototype functions
MQ_t* MQ_init(uint8_t len);
void MQ_send(MQ_t* mq, payload_t data);
payload_t MQ_recv(MQ_t* mq);

// Instantiate the queue
MQ_t* MQ_init(uint8_t len){
    // works only with len >= 2 bc of circular buffer quirks
    assert(len > 1);

    payload_t* data = malloc(len * sizeof(payload_t));
    MQ_t* mq = (MQ_t*)malloc(sizeof(MQ_t));
    mq->buf = data;
    mq->len = len;
    mq->read = 0;
    mq->write = 0;
    atomic_flag_clear(&mq->mtx);
    return mq;  
}

// Place a message into the MQ
void MQ_send(MQ_t* mq, payload_t data){
    bool ok = 0;
    while(!ok){
        // this spinlocks if no yield() to scheduler
        while(!atomic_flag_test_and_set(&mq->mtx)){
            // CHOOSE: Do an exponential backoff if you're fancy
            usleep(1);
            // OR, yield with whatever equivalent your (RT)OS uses  
            // sched_yield();
        }
        
        // critical section start
        if(!(mq->read == (mq->write + 1) % mq->len)){
            // perform the write iff it is valid
            mq->buf[mq->write] = data;
            mq->write = (mq->write + 1) % mq->len;
            ok = 1;
        }
        // critical section end; release the lock
        atomic_flag_clear(&mq->mtx);
    }
}

// Get a message from the MQ.
// If empty, returns a zero-initalized payload_t. Non-blocking.
payload_t MQ_recv(MQ_t* mq){
    payload_t result;
    
    bool ok = 0;
    while(!ok){
        while(!atomic_flag_test_and_set(&mq->mtx)){
            // CHOOSE: Do an exponential backoff if you're fancy
            usleep(1);
            // OR, yield with whatever equivalent your (RT)OS uses  
            // sched_yield();
        }

        // critical section start
        if(!(mq->read == mq->write)){
            // perform the write iff it is valid
            result = mq->buf[mq->read];
            mq->read = (mq->read + 1) % mq->len;
            ok = 1;
        }
        // critical section end, release the lock
        atomic_flag_clear(&mq->mtx);
    }
    return result;
}

Oversimplified Correctness Check

Simple correctness check with a driver function (main).

You should see the order of which Producer P sent message M, and the order of which Consumer C receives message M from Producer P.

Empirically, on a x86 machine running Ubuntu, all messages received by consumers are unique (i.e. no Producer P’s message M is ever received more than once).

Also, if we set NUM_CONSUMERS to be 1, we also observe that the order of which messages are received are also in-order of them being sent. The message queue hence follows a FIFO order1.

 0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>

void* producer(void* arg){
    void** args = (void**) arg;
    MQ_t* mq = (MQ_t*) args[0];
    int id = (int) (intptr_t) args[1];
    payload_t pl;
    for(int i = 0; i < 5; ++i) {
        pl.data = i;
        pl.id = id;
        MQ_send(mq, pl);
        printf("P%d send %d\n", id, i);
        usleep(100000); // Sleep for 100 ms to simulate work
    }
    return NULL;
}

void* consumer(void* arg) {
    void** args = (void**) arg;
    MQ_t* mq = (MQ_t*) args[0];
    int id = (int) (intptr_t) args[1];
    while (1) {
        payload_t data = MQ_recv(mq);
        printf("C%d get P%d-%d\n", id, data.id, data.data);
        usleep(300000); // Sleep for 300 ms
    }
    return NULL;
}

#define LEN 10
#define NUM_PRODUCER 5
#define NUM_CONSUMER 5

int main() {
    // Create some producers and comsumers.
    // Add as many as you want to stress test.
    pthread_t producers[NUM_PRODUCER], consumers[NUM_CONSUMER];
    MQ_t* mq = MQ_init(LEN);

    void* producer_args[NUM_PRODUCER][2];
    void* consumer_args[NUM_CONSUMER][2];

    // Create producers
    for (int i = 0; i < NUM_PRODUCER; ++i) {
        producer_args[i][0] = (void*) mq;
        producer_args[i][1] = (void*) (intptr_t) i;
        if (pthread_create(&producers[i], NULL, producer, producer_args[i]) != 0) {
            perror("Failed to create producer thread");
            return 1;
        }
    }

    // Create consumers
    for (int i = 0; i < NUM_CONSUMER; ++i){
        consumer_args[i][0] = (void*) mq;
        consumer_args[i][1] = (void*) (intptr_t) i;
        if (pthread_create(&consumers[i], NULL, consumer, consumer_args[i]) != 0) {
            perror("Failed to create consumer thread");
            return 1;
        }
    }

    // join the threads
    for (int i = 0; i < NUM_PRODUCER; ++i) {
        pthread_join(producers[i], NULL);
    }

    // VERY lazy consumer implementation
    // check correctness via stdout then SIGTERM out.
    pthread_join(consumers[0], NULL); //blocks forever
    // Cleanup in case the thread dies
    free(mq);
    return 0;
}

That’s all. Cheers!


  1. When testing NUM_CONSUMERS > 1, the consumers may print their output out of order. This is likely because of the interleaving of the instruction for print messages between each consumer. Similarly, we COULD see that the producers' placement of messages into the queue may not tally with the print messages (I have not encountered this in testing). ↩︎