/* Copyright (C) 2021,2022 fef . All rights reserved. */ /* * This is an implementation of the nonblocking queue algorithm presented in: * M. M. Michael and M. L. Scott, "Nonblocking Algorithms and Preemption-Safe * Locking on Multiprogrammed Shared Memory Multiprocessors," * Journal of Parallel and Distributed Computing, vol 51, no 1, pp 1–26, 1998. * * * */ #include #include #include #include #include #include #if __SIZEOF_POINTER__ == 4 #define VERSION_BITS 2 #elif __SIZEOF_POINTER__ == 8 #define VERSION_BITS 3 #else #error "Unsupported pointer size" #endif /** * We use the least significant bits of pointers as a "version number" in order * to (kind of) solve the ABA problem while still being able to write out both * version and pointer in one atomic instruction. * It is safe to use these bits because both the stack and memory returned by * `kmalloc()` are always aligned to least a full pointer size, and structure * members always get aligned to their own size as well. So, on 32-bit we * have 2 bits and on 64-bit we have 3 bits that are known to always be zero. * * This is still going to break if exactly `n * (1 << VERSION_BITS)` items are * added/removed consecutively between our own read/exchange cycle, *and* the * last item has the same address as the original one. I assume it's *very* * unlikely this is ever gonna happen though, since it would already be unusual * for the same item to be enqueued more than once at all. */ union verptr { unsigned ver:VERSION_BITS; struct kq_node *ptr; }; /** Get the actual pointer value, without the version number. */ #define VERPTR(verptr) align_floor((verptr).ptr, 1 << VERSION_BITS) struct kqueue *kq_create(enum mflags flags) { struct kqueue *kq = kmalloc(sizeof(*kq), flags); if (kq != nil) kq_init(kq); return kq; } void kq_add(struct kqueue *kq, struct kq_node *_node) { patom_init(&_node->next, nil); union verptr node, tail, next; node.ptr = _node; spin_loop { tail.ptr = patom_read(&kq->tail); /* there is always at least one entry in the queue (the dummy), * so we know for sure that the tail can never be nil */ next.ptr = patom_read(&VERPTR(tail)->next); /* If the head has changed during the time we read the other pointer, * it means another thread has added an item. The tail and next * pointers are inconsistent in that case, so we have to try again. */ if (tail.ptr == patom_read(&kq->tail)) { /* Check if tail really points to the last node */ if (VERPTR(next) == nil) { /* tail is up-to-date, try appending the new node */ node.ver = next.ver + 1u; void *tmp = patom_cmp_xchg(&VERPTR(tail)->next, next.ptr, node.ptr); if (tmp == next.ptr) break; } else { /* tail is falling behind, try to make it catch up * (this loops until it points to the actual end) */ next.ver = tail.ver + 1u; patom_cmp_xchg(&kq->tail, tail.ptr, next.ptr); } } } /* * When we reach this, the new node has been inserted into the actual * linked list. What's left is to update the tail pointer, which we * optimistically try once. If that fails, we take care of the mess * next time an item is added or removed from the queue, because it's * not *critical* that the tail always stays consistent. It's only * there so we don't need to iterate over the entire list whenever we * insert something. */ node.ver = tail.ver + 1u; patom_cmp_xchg(&kq->tail, tail.ptr, node.ptr); } struct kq_node *kq_del(struct kqueue *kq) { union verptr head, tail, next, ret; spin_loop { head.ptr = patom_read(&kq->head); tail.ptr = patom_read(&kq->tail); /* there is always at least one entry in the queue (the dummy), * so we know for sure that the head can never be nil */ next.ptr = patom_read(&VERPTR(head)->next); /* If the head has changed during the time we read the other two pointers, * it means another thread has removed an item. The head, tail, and next * pointers are inconsistent in that case, so we have to try again. */ if (head.ptr == patom_read(&kq->head)) { /* Check if the queue is empty or if the tail is falling behind, * the latter happens if the last kq_add() failed to update it */ if (head.ptr == tail.ptr) { /* Check if the queue is empty */ if (VERPTR(next) == nil) { /* it is, so we are done here */ ret.ptr = nil; break; } else { /* it is not, so try to update the tail */ next.ver = tail.ver + 1u; patom_cmp_xchg(&kq->tail, tail.ptr, next.ptr); } } else { /* Queue is not empty and the tail is consistent, * that means we can try removing the first node */ ret.ptr = next.ptr; next.ver = head.ver + 1u; void *tmp = patom_cmp_xchg(&kq->head, head.ptr, next.ptr); if (tmp == head.ptr) break; } } } return VERPTR(ret); }