Modern c++ coding Thread pool

First attempt is to code the thread pool is using simple conditional variables and mutex. Once the multi-producer, multi-consumer thread pool is working as expected, its easy to convert into lock free buffer.

The ​​Threadpool class is a struct with following member variables.

  1. struct Work with following member variables
    1. _work to store the lambda/function pointer
    2. _args_ary to store the array
    3. is_processed boolean flag.
  2. std::array of a fixed size to hold the work.
  3. static const variable to indicate the max_size of work array.
  4. head_idx and tail_idx to guide the next index.
  5. a condition variable cv
  6. mutex m

There are 2 main methods for enqueuing and popping the items out and working on it. Since its a circular queue, next index calculated by adding one to current index and taking remainder from max_size. i.e.

next_idx = (current_index + 1 ) % max_size

Here is an example of SPSC – single producer single consumer locked queue. The queue is bounded and blocking. Means, if either the producer or the consumer is slow, it is going to block using the condition variable and mutex.

#include <iostream>
#include <thread>
#include <array>
#include <functional>
#include <vector>
#include <random>
#include <chrono>

using func_type = std::function<int(int,int)>;
using args_ary = std::array<int,2>;

struct ThreadPool{
  // Enqueue function to get the function and arguments to the function.
  // consumer can invoke function with args passed on to it. 
  bool enqueue(func_type fn, args_ary args){
    size_t next_idx = (head_idx + 1) % max_size;

    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk,[&](){
      return work[next_idx].processed();
    });

    Work w{fn, args};
    work[next_idx] = w;
    head_idx = next_idx;
    cv.notify_all();
    return true;
  }

  // process or pop function to pop the latest record and process it.
  bool process(){
    size_t next_idx = (tail_idx + 1) % max_size;
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk,[&](){
      return !work[next_idx].processed();
    });

    Work w = work[next_idx];
    work[next_idx].set_processed();

    if(!w.invalid()){
      int output = w.process();
      std::cout << output << '\n';
    }
    tail_idx = next_idx;
    cv.notify_all();
    return true;
}

private:
  struct Work{
    private:
      func_type _work;
      args_ary _args;
      bool is_processed = false;

    public:
      Work():is_processed(true){};
      Work(func_type work, args_ary args):_work{work},_args{args},is_processed(false){};
      bool processed(){
        return is_processed;
      }
      int process(){
        return _work(_args[0], _args[1]);
      }
      bool invalid(){
        return _work == nullptr;
      }
      void set_processed(){
        is_processed = true;
      }
   };

   static const size_t max_size = 1000;
   std::array<Work,max_size> work;
   size_t head_idx = 0, tail_idx = 0;
   std::condition_variable cv;
  std::mutex mtx;
};

int random_num(){
  std::mt19937 rng;
  rng.seed(std::random_device()());
  std::uniform_int_distribution<std::mt19937::result_type> dist(1,10000); // distribution in range [1,10000]
  return dist(rng);
}

int main(){
  ThreadPool tp;

  std::thread producer1([&](){
    int count = 0;
    while(true){
      std::function<int(int,int)> fn;
      if (count % 3 == 0){
        fn = [](int a, int b){ std::cout << "Adding \t\t" << a << '\t'<< b << '\t'; return a+b;};
      }else if(count % 3 == 1){
        fn = [](int a, int b){ std::cout << "Subtracting \t" << a << '\t'<< b << '\t'; return a-b;};
      }else{
        fn = [](int a, int b){std::cout << "Multiplying \t" << a << '\t'<< b << '\t'; return a*b;};
      }
      tp.enqueue(fn, args_ary{random_num(),random_num()});
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      ++count;
     }
  });

  std::thread consumer1([&](){
    while(true){
      tp.process();
    }
  });

  if(producer1.joinable()){
    producer1.join();
  }

  if(consumer1.joinable()){
    consumer1.join();
  }
  return 0;
}

Here is a complete working example for MPMC queue.

#include <iostream>
#include <thread>
#include <array>
#include <functional>
#include <vector>
#include <random>
#include <chrono>

using func_type = std::function<int(int,int)>;
using args_ary = std::array<int,2>;

struct ThreadPool{
  ThreadPool(){
    for(int i = 0; i < max_size; ++i){
      work[i] = Work();
    }
  }

  bool enqueue(func_type fn, args_ary args){
    size_t next_idx = (head_idx + 1) % max_size;
    while(work[next_idx].processed()){
      std::unique_lock<std::mutex> lk(mtx);
      cv.wait(lk,[&](){
        return work[next_idx].processed();
      });

      Work w = work[next_idx];
      if(w.invalid() || w.processed()){
        work[next_idx].set_work(fn);
        work[next_idx].set_args(args);
        work[next_idx].reset();
        cv.notify_all();
        break;
      }
   }
   head_idx = next_idx;
   return true;
}

  bool process(int remainder, int total_threads){
    size_t next_idx = (tail_idx + 1) % max_size;

    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk,[&](){
      return work[next_idx].unprocessed();
    });

    Work w = work[next_idx];
    work[next_idx].set_processed();

    if(!w.invalid()){
      int output = w.process();
      std::cout << output << '\n';
    }
    tail_idx = next_idx;
    cv.notify_all();
    return true;
  }

  private:
    struct Work{
    private:
      func_type _work;
      args_ary _args;
      bool is_processed = false;

    public:
      Work():is_processed(true){}; 
      int process(){
        return _work(_args[0], _args[1]);
      }
    
    bool invalid(){
      return _work == nullptr;
    }
    
    void set_processed(){
      is_processed = true;
    }

    void set_work(func_type work){
      _work = work;
    }

    void set_args(args_ary args){
      _args = args;
    }
    void reset(){
      is_processed = false;
    }

    bool unprocessed(){
      return is_processed == false;
    }

    bool processed(){
      return is_processed == true;
    }
};

  static const size_t max_size = 1000;
  std::array<Work,max_size> work;
  size_t head_idx = 0, tail_idx = 0;
  std::condition_variable cv;
  std::mutex mtx;
};

int random_num(){
  std::mt19937 rng;
  rng.seed(std::random_device()());
  std::uniform_int_distribution<std::mt19937::result_type> dist(1,10000); // distribution in range [1,10000]
  return dist(rng);
}

int main(){
  ThreadPool tp;
  auto adder = [](int a, int b){ std::cout << "Adding \t\t" << a << '\t'<< b << '\t'; return a+b;};
  auto subtractor = [](int a, int b){ std::cout << "Subtracting \t" << a << '\t'<< b << '\t'; return a-b;};
  auto multiplicator = [](int a, int b){std::cout << "Multiplying \t" << a << '\t'<< b << '\t'; return a*b;};

  std::thread producer1([&](){
    while(true){
      tp.enqueue(adder, args_ary{random_num(),random_num()});
      //std::this_thread::sleep_for(std::chrono::milliseconds(10));
    } 
  });

  std::thread producer2([&](){
    while(true){
      tp.enqueue(subtractor, args_ary{random_num(),random_num()}); 
      //std::this_thread::sleep_for(std::chrono::milliseconds(10));
  } 
});

std::thread producer3([&](){
  while(true){
    tp.enqueue(multiplicator, args_ary{random_num(),random_num()}); 
      //std::this_thread::sleep_for(std::chrono::milliseconds(10));
  } 
});

std::thread consumer1([&](){
  while(true){
    tp.process(0, 3);
  }
});

std::thread consumer2([&](){
  while(true){
    tp.process(1, 3);
  }
});

std::thread consumer3([&](){
  while(true){
    tp.process(2, 3);
  }
});


  if(producer1.joinable()){
    producer1.join();
  }

  if(producer2.joinable()){
    producer2.join();
  }

  if(producer3.joinable()){
    producer3.join();
  }

  if(consumer1.joinable()){
    consumer1.join();
  }

  if(consumer2.joinable()){
    consumer2.join();
  }
  if(consumer3.joinable()){
    consumer3.join();
  }

  return 0;
}

Output

Adding      9316 8539 17855
Subtracting 3529 1274 2255
Multiplying 7821 2404 18801684
Adding      9455 8300 17755
Subtracting 9882 5696 4186
Multiplying 202 9795 1978590
Adding      5794 7979 13773
Subtracting 4331 576 3755
Multiplying 963 4548 4379724
Adding      7238 9714 16952
Subtracting 9868 5185 4683
Multiplying 9139 4941 45155799
Adding      6842 7031 13873
Subtracting 3043 2312 731

Lockfree code

A Single producer, single consumer Lockfree queue can be written as follows

#include <iostream>
#include <thread>
#include <array>
#include <functional>
#include <vector>
#include <random>
#include <chrono>

using func_type = std::function<int(int,int)>;
using args_ary = std::array<int,2>;

struct ThreadPool{
  // Constructor
  ThreadPool(){
    for(int i = 0; i < max_size; ++i){
      work[i] = Work();
    }
  }

  bool enqueue(func_type fn, args_ary args){
    size_t current_head_idx = head_idx.load(std::memory_order_acquire);
    std::atomic<size_t> next_idx = (current_head_idx + 1) % max_size;

    Work w = work[next_idx];
    if(w.invalid() || w.processed()){
      work[next_idx].set_work(fn);
      work[next_idx].set_args(args);
      work[next_idx].reset();
      head_idx.store(next_idx, std::memory_order_release);
    }
    return true;
  }

  bool process(){
    size_t current_tail_idx = tail_idx.load(std::memory_order_acquire);
    std::atomic<size_t> next_idx = (current_tail_idx + 1) % max_size;

    if(!work[next_idx].processed()){
      int output = work[next_idx].process();
      std::cout << output << '\n';
      work[next_idx].set_processed();
      tail_idx.store(next_idx);
     }
    return true;
}

  private:
    struct Work{
      private:
        func_type _work;
        args_ary _args;
        bool is_processed = false;

      public:
        Work():is_processed(true){};
        Work(func_type work, args_ary args):_work{work},_args{args},is_processed(false){};
        bool processed(){
          return is_processed == true;
        }

       int process(){
         return _work(_args[0], _args[1]);
       }

       bool invalid(){
         return _work == nullptr;
       }

       void set_processed(){ 
         is_processed = true;
       }

       void set_work(func_type work){
         _work = work;
       }

       void set_args(args_ary args){
         _args = args;
       }
       
       void reset(){
         is_processed = false;
       }
    };

    static const size_t max_size = 1000;
    std::array<Work,max_size> work;
    std::atomic<size_t> head_idx{0}, tail_idx{0};
};

int random_num(){
  std::mt19937 rng;
  rng.seed(std::random_device()());
  std::uniform_int_distribution<std::mt19937::result_type> dist(1,10000); // distribution in range [1,10000]
  return dist(rng);
}

int main(){
  ThreadPool tp;

  std::thread producer1([&](){
    int count = 0;
    while(true){
      std::function<int(int,int)> fn;
      if (count % 3 == 0){
        fn = [](int a, int b){ 
               std::cout << "Adding \t\t" << a << '\t'<< b << '\t'; 
               return a+b;
              };
      }else if(count % 3 == 1){
        fn = [](int a, int b){ 
               std::cout << "Subtracting \t" << a << '\t'<< b << '\t'; 
               return a-b;};
      }else{
        fn = [](int a, int b){
              std::cout << "Multiplying \t" << a << '\t'<< b << '\t'; 
              return a*b;};
        }
       tp.enqueue(fn, args_ary{random_num(),random_num()});
       ++count;
    }
});

  std::thread consumer1([&](){
    while(true){
     tp.process();
    }
  });

  if(producer1.joinable()){
    producer1.join();
  }

  if(consumer1.joinable()){
    consumer1.join();
  }
  return 0;
}

Atomics & lock-free data structures c++

  • The modern microprocessor pipeline is 14 stages deep during which the programming instructions reordered all the times for optimization purposes.
  • Linux 2.6 supports full preemption, i.e. the thread can be suspended in between.
  • Atomic types are the locations in main memory to which the access is exclusive to a thread/process.
  • Barriers are used for ordering the accesses to the memory locations.
  • Atomic operations are provided at hardware level in order to make the operations indivisible.
  • The Implementation is highly dependent upon the hardware. x86 has strictest rules around memory ordering.
  • Atomic operations with memory fences prevents reordering of the instructions in order to make the operation indivisible.
  • Atomic operations are expensive because the OS and hardware can not do all the necessary optimizations.

<atomic> header provides various atomic types. Following is non-exhaustive list of atomic types

atomic_bool std::atomic<bool>
atomic_char std::atomic<char>
atomic_schar std::atomic<signed char>
atomic_uchar std::atomic<unsigned char>
atomic_int std::atomic<int>
atomic_uint std::atomic<unsigned>
atomic_short std::atomic<short>
atomic_ushort std::atomic<unsigned short>
atomic_long std::atomic<long>
atomic_ulong std::atomic<unsigned long>
atomic_llong std::atomic<long long>
atomic_ullong std::atomic<unsigned long long>
atomic_char16_t std::atomic<char16_t>
atomic_char32_t std::atomic<char32_t>
atomic_wchar_t std::atomic<wchar_t>

Operations on Atomic types

These operations take a argument for memory order. Which can be one of std::memory_order_relaxed, std::memory_order_acquire,  std::memory_order_release, std::memory_order_acq_rel,  std::memory_order_consume or  std::memory_order_seq_cst

  • load: this is read operation on an atomic type.
  • store: this is a write operation on an atomic type.
  • exchange: this is read-modify-write operation on an atomic type. All the compare operations are used as compare_exchange(expected, desired, <optional memory order>). On successful exchange it return true else it returns false.
    • compare_exchange
    • compare_exchange_weak: these are really for the architectures where the read-modify-write operation is not guaranteed atomic. It can generate spurious errors and it is advised to use in a loop. It has same effect as of compare_exchange_strong on x86 platform.
    • compare_exchange_strong: it is guaranteed to return false on failure and guaranteed to return true on success.
  • fetch_ versions of add, or etc
  • overriden operators like +=, -=, *=, |=

Lock Based implementation of a Multi-producer, Multi-consumer Queue.

producer-consumer

It is important to see the lock based data structures before implementing lock-free data structures.

/*
Code for Lock based queue.
std::queue is wrapped in a struct Queue. 
internal_queue variable maintains the queue.
Work to do is an instance of struct Work.
Queue has pop and push functions.
Push function takes the work instance by rvalue instance and 
pushes it onto the internal queue.
Pop function returns a shared pointer to Work. The shared pointer
 is used to avoid the ABA problem.
Pop function is a non blocking function.

If the queue is empty, the thread just yields.
A sleep is added in order to simulate slowness in the work..

Disadvantages of this kind of queue:
 * Whole Queue is locked during any operations in order to make 
sure there is synchronized access.
*/

#include <queue> // std::queue
#include <mutex> // for locking
#include <thread>
#include <memory> // smart pointers
#include <utility> 
#include <functional> // passing and storing lambda 
#include <iostream> // for std::cout
#include <chrono>

static long work_number = 0;

struct Work{
 // take lambda as a work and call lambda to process.
  Work(std::function<void(void)> lambda):f(lambda){};
  void operator()(){
    f(); 
   }
private:
  std::function<void(void)> f; 
};

struct Queue{
  Queue() = default;
  // Queue is not copyable or copy constructible.
  Queue(const Queue &) = delete;
  Queue & operator=(const Queue &) = delete;
  std::shared_ptr<Work> pop();
  void push(Work &&);
private:
  std::mutex mtx;
  std::queue<Work> internal_queue;
};

 void Queue::push(Work && work){
  std::lock_guard<std::mutex> lg(mtx);
  internal_queue.push(work);
 }

 std::shared_ptr<Work> Queue::pop(){
  Work w([](){});
  {
    std::lock_guard<std::mutex> lg(mtx);
    if(internal_queue.size() > 0){
      w = std::move(internal_queue.front());
      internal_queue.pop();
     }
    else
    {
     std::this_thread::yield(); // let the other threads work
   }
 }
 return std::make_shared<Work>(w);
}

struct Producer{
 Producer(Queue & q):q(q){};
 void produce(){
   for(;;){
     Work w([&](){std::cout << "work number : " << ++work_number << " is called.." << "\n";});
     q.push(std::move(w));
   }
 }
private:
 Queue & q;
};

struct Consumer{
  Consumer(Queue & q):q(q){};
   void consume(){
   for(;;){
     std::shared_ptr<Work> w_ptr = q.pop();
     Work * w = w_ptr.get();
     (*w)();
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
   }
 }
private:
 Queue & q;
};

int main(){
 Queue q;
 std::thread producer_thread1([&q](){
   Producer p(q);
   p.produce();
 });

std::thread producer_thread2([&q](){
  Producer p(q);
  p.produce();
 });

std::thread consumer_thread1([&q](){
 Consumer c(q);
 c.consume();
 });

std::thread consumer_thread2([&q](){
 Consumer c(q);
 c.consume();
 });

std::thread consumer_thread3([&q](){
 Consumer c(q);
 c.consume();
 });

 producer_thread1.join();
 producer_thread2.join();
 consumer_thread1.join();
 consumer_thread2.join();
 consumer_thread3.join();
 return 0;
}

Lock Free Data Structures

In the above implementation, the data structure is a lock based one and hence no two threads can access is concurrently.

c++ multithreading code

Screen Shot 2018-04-12 at 11.33.23 PM
By I, Cburnett, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=2233446

Simple example of defining work as () operator overload on a struct.

#include <thread>
#include <iostream>

struct work{
 void operator ()() const{
 std::cout << "this is test from work" << std::endl;
 }
};

int main(){
   work w;
   std::thread t([&](){
     w();
   });
   t.join(); // wait in the main thread for finishing the thread t.
   return 0;
}

std::thread objects are not copyable but they are movable.

std::thread t1([]{std::cout << std::this_thread::get_id() << std::endl;});
std::thread t2 = t1; // NOT going to work since constructor 
for thread class is private
std::thread t2 = std::move(t1); // this work!

Detach thread: background/long running jobs which does not to report the status back to parent threads can be detached.

t.detach(); // once detached, can not be joined.

RAII applied to thread class

 

#include <thread>
#include <iostream>

struct RThread{
   RThread(){
     std::thread t([](){std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;});
     t_= std::move(t);
   }

   // Rule of 5
  ~RThread(){
     if(t_.joinable()){
       t_.join();
    }
   }

  // delete the copy constructor and copy assignment operator
  RThread(RThread const &) = delete;
  RThread& operator=(RThread const &) = delete;

  // use default move operator and move assignment operator
  RThread(RThread const &&) = default;
  RThread & operator=(RThread const && ) = default;

  private:
   std::thread t_;
};

int main(){
 RThread();
 return 0;
}

Threads pool and Atomic shared variable

#include <thread>
#include <iostream>
#include <vector>
#include <atomic>

std::atomic<int> total(0);

int main(){
   std::vector<std::thread> workers;
   for(int i = 0; i <= 20; ++i){
     workers.emplace_back(
       std::thread([](){
         for(int j = 0; j <= 20; ++j){
           // can also use ++total same effect
           // Memory order does not matter on x86
           total.fetch_add(1, std::memory_order_relaxed);
         }
       }));
     }

    std::for_each(
       workers.begin(),
       workers.end(),
       [](auto & w){
         if(w.joinable()){
           w.join();
         }
       }
     );

  std::cout << total << std::endl;
  return 0;
}

Using non-atomic variables with mutexes to prevent data races

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>

static int total = 0;
static int worker_count = 20;
std::mutex mtx;

int main(){
  std::vector<std::thread> workers;
  workers.reserve(worker_count);
  for(int i = 0; i < worker_count; ++i){
    workers.emplace_back(std::thread([](){
    for(int j = 0; j < worker_count; ++j){ 
      std::lock_guard<std::mutex> lkg(mtx);
      ++total;
     }
   })
   );
 }

for(auto & worker: workers){
   if (worker.joinable()){
   worker.join();
  }
 }

std::cout << total << std::endl;
 return 0;
}

DCLP – Double Checked Linked Pattern

As per the paper by Scott Meyers and Andrei Alexandrescu, the DCLP pattern implementation is given below.

In order to get instance of a singleton class, there are two checks, first check without the lock, then the lock is acquired and then again checked if the instance exists.

Singleton* Singleton::instance() { 
if (pInstance == 0) { // 1st test 
    Lock lock; // acquire the lock 
    if (pInstance == 0) { // 2nd test 
      pInstance = new Singleton; 
    } 
  } 
  return pInstance; 
}

In order to simplify this situation, c++ standard came with call_once and once_flag.

std::once_flag pInstance_initialized; // Flag
std::call_once(pInstance_initialized, {pInstance =new Singleton;});

Here is an example code called once as per the once flag:

#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
#include <atomic>

int worker_count = 20;

int main(){
  std::vector<std::thread> workers;
  workers.reserve(worker_count);
  std::once_flag is_called;
  workers.reserve(worker_count);
  for(int i = 0; i < worker_count; ++i){
    workers.emplace_back(std::thread([&](){
      std::call_once(is_called, [](){
      std::cout << "once_flag is set in the thread id " << std::this_thread::get_id() << std::endl;
    });
   })
   );
  }

  for(auto & worker: workers){
   if (worker.joinable()){
     worker.join();
   }
  }

  return 0;
}

Condition Variables: used for intercommunication between the threads. Condition Variables work with mutex as a way of synchronization between threads.

Following is a classic example of printing number series like 0102030405… using 3 threads. One thread prints 0, another prints all odd numbers and 3rd thread prints even numbers. Condition variables are used to solve this problem.

#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>

std::condition_variable cv;
std::mutex mtx;
int n = 1;
int max = 100;
bool digit_printed = false;

int main(){
  std::thread t_zero([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == true;
    });
    std::cout << 0<< "\n";
    digit_printed = false;
    cv.notify_all();
   }
 });

std::thread t_odd([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == false && n%2 == 1;
    });
    std::cout << n++ << "\n";
    digit_printed = true;
    cv.notify_all();
   }
 });

std::thread t_even([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == false && n%2 == 0;
    });
    std::cout << n++ << "\n";
    digit_printed = true;
    cv.notify_all();
   }
 });
 
 t_zero.join();
 t_odd.join();
 t_even.join();
 return 0;
}

 

c++ Multithreading Theory and Concepts

Concurrency vs parallelism

Concurrency is when two or more tasks can start, run, and complete in overlapping time periods. It doesn’t necessarily mean they’ll ever both be running at the same instant. For example, multitasking on a single-core machine.

Parallelism is when tasks literally run at the same time, e.g., on a multicore processor.

following image explains the difference correctly.

1__4B2PKsJn9pUz3jbTnBnYw
image credit: https://medium.com/@deepshig/concurrency-vs-parallelism-4a99abe9efb8

Why Threading

  • Forking processes has lot of overhead like creating the data structures for processes.
  • Interprocess communication is also complex. Because of overhead, its not appropriate for latency sensitive applications.
  • Threading makes parallelism easy with simplified coding API.
  • Threading makes data sharing simple.

Threading in Linux

Linux represents threads as lightweight processes. POSIX threads library provides API for thread management using the header pthread.h. A simple program to create threads is shown as

#include <pthread.h>
#include <iostream>

void * work(void * void_ptr)
{
  std::cout << "in the work function" << std::endl;
  return NULL;
}

int main(){
  pthread_t thread_id;
  pthread_create(&thread_id, NULL, work, NULL);
  pthread_join(thread_id, NULL);
  return 0;
}

Threading using Modern c++

#include <thread>
#include <iostream>

void work(){
 std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main(){
 std::thread t(work);
 t.join();
 return 0;
}

Modern c++ 11 introduced following in regards to multi-threading

  • std::thread and related classes
  • memory model: this defines concepts such as memory location, threads, synchronization primitives and most importantly the memory order.
    • memory order: memory order means how access to memory is managed for atomic and non-atomic variables and operations. The memory order guarantees the memory read or write operations will be done in the order specified. There are following types of memory orders.
      • memory_order_relaxed
      • memory_order_consume
      • memory_order_acquire
      • memory_order_release
      • memory_order_acq_rel
      • memory_order_seq_cst: this is default memory order if no memory order is specified,

Why memory ordering

The program written in c/c++ gets compiled to assembly. The compiler reorders the instructions are all the times for optimization. In a single threaded program, if  load or store instruction reordered for a variable, there is generally no problem because the effects of reordering is observed at the end of program. But in a multi-threaded program if load or store instructions for a shared variable get reordered, then the threads may observe the shared variable in inconsistent state.

Screen Shot 2018-04-08 at 11.43.49 PM
The instruction reordering as per Herb Sutter’s talk: https://www.youtube.com/watch?v=KeLBd2EJLOU

Here is an excellent talk on memory reordering:

Reordering can be achieved using fences. Fences can be Compiler fences or hardware fences. Memory order types in c++ memory model:

  • memory_order_relaxed: instructions can be ordered in sequence, there is no strict ordering applied.
  • memory_order_acquire: A load operation guarantees that the instructions after acquire operations happen after the load happens i.e. the operations will NOT be moved before current load happens.
  • memory_order_release: The store operation guarantees that the instructions happening before release operations happen before store happens, i.e. the instructions will not be moved after the store.
  • memory_order_acq_rel: This combines effects of both memory_order_acquire and memory_order_release. This provides the read/write ordering relative to atomic variables.
  • memory_order_seq_cst: This is most strictest of all the memory ordering.  This provides read/write ordering relative to all variables including non-acomic variables too.

Here are couple more interesting talks on this subject

 

Hardware Fences/Barriers in x86

x86 ISA provides strictest memory ordering guarantees.

There are 3 types of fences:

  • SFENCE — Serializes all store (write) operations that occurred prior to the SFENCE instruction in the program instruction stream, but does not affect load operations.

  • LFENCE — Serializes all load (read) operations that occurred prior to the LFENCE instruction in the program instruction stream, but does not affect store operations.2

  • MFENCE — Serializes all store and load operations that occurred prior to the MFENCE instruction in the program instruction stream.

The memory ordering in x86

  • Reads are not reordered with other reads.
  • Writes are not reordered with older reads
  • Reads may be reordered with older writes to different locations but not with older writes to the same location.

  • Reads or writes cannot be reordered with I/O instructions, locked instructions, or serializing instructions.

  •  Reads cannot pass earlier LFENCE and MFENCE instructions.

  •  Writes and executions of CLFLUSH and CLFLUSHOPT cannot pass earlier LFENCE, SFENCE, and MFENCE instructions.

  • LFENCE instructions cannot pass earlier reads. 

  • SFENCE instructions cannot pass earlier writes or executions of CLFLUSH and CLFLUSHOPT.

  • MFENCE instructions cannot pass earlier reads, writes, or executions of CLFLUSH and CLFLUSHOPT.

In a multiple-processor system, the following ordering principles apply:

  • Individual processors use the same ordering principles as in a single-processor system.
  • Writes by a single processor are observed in the same order by all processors.
  • Writes from an individual processor are NOT ordered with respect to the writes from other processors.
  • Memory ordering obeys causality (memory ordering respects transitive visibility).
  • Any two stores are seen in a consistent order by processors other than those performing the stores
  • Locked instructions have a total order.

 

Issues in Multithreading

  • Data sharing can be complicated
  • Race conditions can be difficult to debug
  • Dead locks
  • Live locks

Commons Problems in Parallel Programming

Dining Philosophers Problem

140px-An_illustration_of_the_dining_philosophers_problemThis is a classic problem with data sharing and signaling.

Five (male) philosophers spend their lives thinking and eating. The philosophers share a common circular table surrounded by five chairs, each belonging to one philosopher. In the centre of the table there is a bowl of spaghetti, and the table is laid with five forks, as shown in figure. When a philosopher thinks, he does not interact with other philosophers. From time to time, a philosopher gets hungry. In order to eat he must try to pick up the two forks that are closest (and are shared with his left and right neighbors), but may only pick up one fork at a time. He cannot pick up a fork already held by a neighbor. When a hungry philosopher has both his forks at the same time, he eats without releasing them, and when he has finished eating, he puts down both forks and starts thinking again.

Solutions to this kind of problem are as follows:

  1. Use resource hierarchy: assign numbers to the forks (resources). The philosopher can only request resource with lower number first then the higher number.
  2. Use a central entity for assigning permissions: use a arbitrator (waiter) as a mutex which will allow the philosopher to periodically use the forks(resources).
  3. Distributed message passing between the philosophers without a central authority.
    1. For every pair of philosophers contending for a resource, create a fork and give it to the philosopher with the lower ID (n for agent Pn). Each fork can either be dirtyor clean. Initially, all forks are dirty.
    2. When a philosopher wants to use a set of resources (i.e. eat), said philosopher must obtain the forks from their contending neighbors. For all such forks the philosopher does not have, they send a request message.
    3. When a philosopher with a fork receives a request message, they keep the fork if it is clean, but give it up when it is dirty. If the philosopher sends the fork over, they clean the fork before doing so.
    4. After a philosopher is done eating, all their forks become dirty. If another philosopher had previously requested one of the forks, the philosopher that has just finished eating cleans the fork and sends it.

Sleeping Barber Problem

206px-Sleeping_barberThis is a classic single procedure multiple consumer problem.

The analogy is based upon a hypothetical barber shop with one barber. The barber has one barber chair and a waiting room with a number of chairs in it. When the barber finishes cutting a customer’s hair, they dismiss the customer and go to the waiting room to see if there are other customers waiting. If there are, they bring one of them back to the chair and cut their hair. If there are no other customers waiting, they return to their chair and sleep in it.

Each customer, when they arrive, looks to see what the barber is doing. If the barber is sleeping, the customer wakes them up and sits in the chair. If the barber is cutting hair, the customer goes to the waiting room. If there is a free chair in the waiting room, the customer sits in it and waits their turn. If there is no free chair, the customer leaves.