Atomics & lock-free data structures c++

  • The modern microprocessor pipeline is 14 stages deep during which the programming instructions reordered all the times for optimization purposes.
  • Linux 2.6 supports full preemption, i.e. the thread can be suspended in between.
  • Atomic types are the locations in main memory to which the access is exclusive to a thread/process.
  • Barriers are used for ordering the accesses to the memory locations.
  • Atomic operations are provided at hardware level in order to make the operations indivisible.
  • The Implementation is highly dependent upon the hardware. x86 has strictest rules around memory ordering.
  • Atomic operations with memory fences prevents reordering of the instructions in order to make the operation indivisible.
  • Atomic operations are expensive because the OS and hardware can not do all the necessary optimizations.

<atomic> header provides various atomic types. Following is non-exhaustive list of atomic types

atomic_bool std::atomic<bool>
atomic_char std::atomic<char>
atomic_schar std::atomic<signed char>
atomic_uchar std::atomic<unsigned char>
atomic_int std::atomic<int>
atomic_uint std::atomic<unsigned>
atomic_short std::atomic<short>
atomic_ushort std::atomic<unsigned short>
atomic_long std::atomic<long>
atomic_ulong std::atomic<unsigned long>
atomic_llong std::atomic<long long>
atomic_ullong std::atomic<unsigned long long>
atomic_char16_t std::atomic<char16_t>
atomic_char32_t std::atomic<char32_t>
atomic_wchar_t std::atomic<wchar_t>

Operations on Atomic types

These operations take a argument for memory order. Which can be one of std::memory_order_relaxed, std::memory_order_acquire,  std::memory_order_release, std::memory_order_acq_rel,  std::memory_order_consume or  std::memory_order_seq_cst

  • load: this is read operation on an atomic type.
  • store: this is a write operation on an atomic type.
  • exchange: this is read-modify-write operation on an atomic type. All the compare operations are used as compare_exchange(expected, desired, <optional memory order>). On successful exchange it return true else it returns false.
    • compare_exchange
    • compare_exchange_weak: these are really for the architectures where the read-modify-write operation is not guaranteed atomic. It can generate spurious errors and it is advised to use in a loop. It has same effect as of compare_exchange_strong on x86 platform.
    • compare_exchange_strong: it is guaranteed to return false on failure and guaranteed to return true on success.
  • fetch_ versions of add, or etc
  • overriden operators like +=, -=, *=, |=

Lock Based implementation of a Multi-producer, Multi-consumer Queue.

producer-consumer

It is important to see the lock based data structures before implementing lock-free data structures.

/*
Code for Lock based queue.
std::queue is wrapped in a struct Queue. 
internal_queue variable maintains the queue.
Work to do is an instance of struct Work.
Queue has pop and push functions.
Push function takes the work instance by rvalue instance and 
pushes it onto the internal queue.
Pop function returns a shared pointer to Work. The shared pointer
 is used to avoid the ABA problem.
Pop function is a non blocking function.

If the queue is empty, the thread just yields.
A sleep is added in order to simulate slowness in the work..

Disadvantages of this kind of queue:
 * Whole Queue is locked during any operations in order to make 
sure there is synchronized access.
*/

#include <queue> // std::queue
#include <mutex> // for locking
#include <thread>
#include <memory> // smart pointers
#include <utility> 
#include <functional> // passing and storing lambda 
#include <iostream> // for std::cout
#include <chrono>

static long work_number = 0;

struct Work{
 // take lambda as a work and call lambda to process.
  Work(std::function<void(void)> lambda):f(lambda){};
  void operator()(){
    f(); 
   }
private:
  std::function<void(void)> f; 
};

struct Queue{
  Queue() = default;
  // Queue is not copyable or copy constructible.
  Queue(const Queue &) = delete;
  Queue & operator=(const Queue &) = delete;
  std::shared_ptr<Work> pop();
  void push(Work &&);
private:
  std::mutex mtx;
  std::queue<Work> internal_queue;
};

 void Queue::push(Work && work){
  std::lock_guard<std::mutex> lg(mtx);
  internal_queue.push(work);
 }

 std::shared_ptr<Work> Queue::pop(){
  Work w([](){});
  {
    std::lock_guard<std::mutex> lg(mtx);
    if(internal_queue.size() > 0){
      w = std::move(internal_queue.front());
      internal_queue.pop();
     }
    else
    {
     std::this_thread::yield(); // let the other threads work
   }
 }
 return std::make_shared<Work>(w);
}

struct Producer{
 Producer(Queue & q):q(q){};
 void produce(){
   for(;;){
     Work w([&](){std::cout << "work number : " << ++work_number << " is called.." << "\n";});
     q.push(std::move(w));
   }
 }
private:
 Queue & q;
};

struct Consumer{
  Consumer(Queue & q):q(q){};
   void consume(){
   for(;;){
     std::shared_ptr<Work> w_ptr = q.pop();
     Work * w = w_ptr.get();
     (*w)();
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
   }
 }
private:
 Queue & q;
};

int main(){
 Queue q;
 std::thread producer_thread1([&q](){
   Producer p(q);
   p.produce();
 });

std::thread producer_thread2([&q](){
  Producer p(q);
  p.produce();
 });

std::thread consumer_thread1([&q](){
 Consumer c(q);
 c.consume();
 });

std::thread consumer_thread2([&q](){
 Consumer c(q);
 c.consume();
 });

std::thread consumer_thread3([&q](){
 Consumer c(q);
 c.consume();
 });

 producer_thread1.join();
 producer_thread2.join();
 consumer_thread1.join();
 consumer_thread2.join();
 consumer_thread3.join();
 return 0;
}

Lock Free Data Structures

In the above implementation, the data structure is a lock based one and hence no two threads can access is concurrently.

Process Synchronization and IPC

Why Synchronization

Since Linux 2.0 support Symmetric Multiprocessing with multicore modern microprocessors. But with this new dimension of problems arose like race conditions, dead/live locks etc. In order to mitigate these issues, various synchronization mechanisms were introduced in Linux. The 2.6 version of Linux supported full preemption, that means scheduler can preempt kernel code at virtually any point and reschedule another task.

  • critical path: code that access and manipulate shared data

Reasons for Synchronization:

  • Interrupts. An interrupt can occur asynchronously at almost any time, interrupting the currently executing code.
  • Softirqs and tasklets. The kernel can raise or schedule a softirq or tasklet at almost any time, interrupting the currently executing code.
  • Kernel preemption. Because the kernel is preemptive, one task in the kernel can preempt another.
  • Sleeping and synchronization with user-space. A task in the kernel can sleep and thus invoke the scheduler, resulting in the running of a new process.
  • Symmetrical multiprocessing. Two or more processors can execute kernel code at exactly the same time.

Ways to Synchronization

  • Shared Memory
  • Sockets
  • Atomics and lock-free programming
  • Pipes/FIFOs
  • Mutexes
    • recursive
    • reader/writer
  • Spinlocks
  •  Semaphores
    • Binary and counting
  • Condition variables
  • Messages Queues

Semaphores

Since all threads run in the same address space, they all have access to the same data and variables. If two threads simultaneously attempt to update a global counter variable, it is possible for their operations to interleave in such way that the global state is not correctly modified. Although such a case may only arise only one time out of thousands, a concurrent program needs to coordinate the activities of multiple threads using something more reliable that just depending on the fact that such interference is rare.

A semaphore is somewhat like an integer variable, but is special in that its operations (increment and decrement) are guaranteed to be atomic—you cannot be halfway through incrementing the semaphore and be interrupted and waylaid by another thread trying to do the same thing. That means you can increment and decrement the semaphore from multiple threads without interference. By convention, when a semaphore is zero it is “locked” or “in use”.

Semaphores vs. mutexes (from wikipedia)
A mutex is essentially the same thing as a binary semaphore and sometimes uses the same basic implementation. The differences between them are in how they are used. While a binary semaphore may be used as a mutex, a mutex is a more specific use-case, in that only the thread that locked the mutex is supposed to unlock it. This constraint makes it possible to implement some additional features in mutexes:

  • Since only the thread that locked the mutex is supposed to unlock it, a mutex may store the id of thread that locked it and verify the same thread unlocks it.
  • Mutexes may provide priority inversion safety. If the mutex knows who locked it and is supposed to unlock it, it is possible to promote the priority of that thread whenever a higher-priority task starts waiting on the mutex.
  • Mutexes may also provide deletion safety, where the thread holding the mutex cannot be accidentally deleted.
  • Alternately, if the thread holding the mutex is deleted (perhaps due to an unrecoverable error), the mutex can be automatically released.
  • A mutex may be recursive: a thread is allowed to lock it multiple times without causing a deadlock.

Semaphore is one of the thread synchronization mechanisms. Unlike mutex, semaphore allows more than one thread to access similar shared resources at the same time. There are two types of semaphores

  • binary semaphore
  • counting semaphore.

Pipes and FIFOs

Pipes are used for communicating data between the processes. Output of a process is piped as an input to another process.

cat doctum.txt | wc

Linux: Pipes no Linux
Output of the cat command goes to word count

image credit: https://www.vivaolinux.com.br/dica/Pipes-no-Linux

 

 

 

 

FIFOs are named pipes. FIFOs can be used as follows:

int mkfifo(const char *path, mode_t mode);
int mkfifoat(int fd, const char *path, mode_t mode);

once the FIFOs are made, they can be opened using open command and the normal file I/O functions (e.g., closereadwriteunlink) all work with FIFOs.