The Code
In memory limited environments, using typical C++ synchronization primitives such as counting_semaphores
may not be possible. Or perhaps you are using C. So, a pure atomic implementation may be needed.
This is my crack at making a buffered MPMC MQ.
Done in 1 night as a POC to a peer. So, this is likely problematic somewhere. But for the sake of documentation, here it is.
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
#include <stdatomic.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
typedef struct {
uint8_t id;
int data; // change type as needed
} payload_t;
typedef struct {
uint8_t len;
uint8_t read;
uint8_t write;
atomic_flag mtx; // use atomics as a mutex
payload_t* buf; // circular buffer
} MQ_t;
// Prototype functions
MQ_t* MQ_init(uint8_t len);
void MQ_send(MQ_t* mq, payload_t data);
payload_t MQ_recv(MQ_t* mq);
// Instantiate the queue
MQ_t* MQ_init(uint8_t len){
// works only with len >= 2 bc of circular buffer quirks
assert(len > 1);
payload_t* data = malloc(len * sizeof(payload_t));
MQ_t* mq = (MQ_t*)malloc(sizeof(MQ_t));
mq->buf = data;
mq->len = len;
mq->read = 0;
mq->write = 0;
atomic_flag_clear(&mq->mtx);
return mq;
}
// Place a message into the MQ
void MQ_send(MQ_t* mq, payload_t data){
bool ok = 0;
while(!ok){
// this spinlocks if no yield() to scheduler
while(!atomic_flag_test_and_set(&mq->mtx)){
// CHOOSE: Do an exponential backoff if you're fancy
usleep(1);
// OR, yield with whatever equivalent your (RT)OS uses
// sched_yield();
}
// critical section start
if(!(mq->read == (mq->write + 1) % mq->len)){
// perform the write iff it is valid
mq->buf[mq->write] = data;
mq->write = (mq->write + 1) % mq->len;
ok = 1;
}
// critical section end; release the lock
atomic_flag_clear(&mq->mtx);
}
}
// Get a message from the MQ.
// If empty, returns a zero-initalized payload_t. Non-blocking.
payload_t MQ_recv(MQ_t* mq){
payload_t result;
bool ok = 0;
while(!ok){
while(!atomic_flag_test_and_set(&mq->mtx)){
// CHOOSE: Do an exponential backoff if you're fancy
usleep(1);
// OR, yield with whatever equivalent your (RT)OS uses
// sched_yield();
}
// critical section start
if(!(mq->read == mq->write)){
// perform the write iff it is valid
result = mq->buf[mq->read];
mq->read = (mq->read + 1) % mq->len;
ok = 1;
}
// critical section end, release the lock
atomic_flag_clear(&mq->mtx);
}
return result;
}
|
Oversimplified Correctness Check
Simple correctness check with a driver function (main).
You should see the order of which Producer P sent message M, and the order of which Consumer C receives message M from Producer P.
Empirically, on a x86
machine running Ubuntu, all messages received by consumers are unique (i.e. no Producer P’s message M is ever received more than once).
Also, if we set NUM_CONSUMERS
to be 1, we also observe that the order of which messages are received are also in-order of them being sent. The message queue hence follows a FIFO order.
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
void* producer(void* arg){
void** args = (void**) arg;
MQ_t* mq = (MQ_t*) args[0];
int id = (int) (intptr_t) args[1];
payload_t pl;
for(int i = 0; i < 5; ++i) {
pl.data = i;
pl.id = id;
MQ_send(mq, pl);
printf("P%d send %d\n", id, i);
usleep(100000); // Sleep for 100 ms to simulate work
}
return NULL;
}
void* consumer(void* arg) {
void** args = (void**) arg;
MQ_t* mq = (MQ_t*) args[0];
int id = (int) (intptr_t) args[1];
while (1) {
payload_t data = MQ_recv(mq);
printf("C%d get P%d-%d\n", id, data.id, data.data);
usleep(300000); // Sleep for 300 ms
}
return NULL;
}
#define LEN 10
#define NUM_PRODUCER 5
#define NUM_CONSUMER 5
int main() {
// Create some producers and comsumers.
// Add as many as you want to stress test.
pthread_t producers[NUM_PRODUCER], consumers[NUM_CONSUMER];
MQ_t* mq = MQ_init(LEN);
void* producer_args[NUM_PRODUCER][2];
void* consumer_args[NUM_CONSUMER][2];
// Create producers
for (int i = 0; i < NUM_PRODUCER; ++i) {
producer_args[i][0] = (void*) mq;
producer_args[i][1] = (void*) (intptr_t) i;
if (pthread_create(&producers[i], NULL, producer, producer_args[i]) != 0) {
perror("Failed to create producer thread");
return 1;
}
}
// Create consumers
for (int i = 0; i < NUM_CONSUMER; ++i){
consumer_args[i][0] = (void*) mq;
consumer_args[i][1] = (void*) (intptr_t) i;
if (pthread_create(&consumers[i], NULL, consumer, consumer_args[i]) != 0) {
perror("Failed to create consumer thread");
return 1;
}
}
// join the threads
for (int i = 0; i < NUM_PRODUCER; ++i) {
pthread_join(producers[i], NULL);
}
// VERY lazy consumer implementation
// check correctness via stdout then SIGTERM out.
pthread_join(consumers[0], NULL); //blocks forever
// Cleanup in case the thread dies
free(mq);
return 0;
}
|
That’s all. Cheers!