c++ multithreading code

Screen Shot 2018-04-12 at 11.33.23 PM
By I, Cburnett, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=2233446

Simple example of defining work as () operator overload on a struct.

#include <thread>
#include <iostream>

struct work{
 void operator ()() const{
 std::cout << "this is test from work" << std::endl;
 }
};

int main(){
   work w;
   std::thread t([&](){
     w();
   });
   t.join(); // wait in the main thread for finishing the thread t.
   return 0;
}

std::thread objects are not copyable but they are movable.

std::thread t1([]{std::cout << std::this_thread::get_id() << std::endl;});
std::thread t2 = t1; // NOT going to work since constructor 
for thread class is private
std::thread t2 = std::move(t1); // this work!

Detach thread: background/long running jobs which does not to report the status back to parent threads can be detached.

t.detach(); // once detached, can not be joined.

RAII applied to thread class

 

#include <thread>
#include <iostream>

struct RThread{
   RThread(){
     std::thread t([](){std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;});
     t_= std::move(t);
   }

   // Rule of 5
  ~RThread(){
     if(t_.joinable()){
       t_.join();
    }
   }

  // delete the copy constructor and copy assignment operator
  RThread(RThread const &) = delete;
  RThread& operator=(RThread const &) = delete;

  // use default move operator and move assignment operator
  RThread(RThread const &&) = default;
  RThread & operator=(RThread const && ) = default;

  private:
   std::thread t_;
};

int main(){
 RThread();
 return 0;
}

Threads pool and Atomic shared variable

#include <thread>
#include <iostream>
#include <vector>
#include <atomic>

std::atomic<int> total(0);

int main(){
   std::vector<std::thread> workers;
   for(int i = 0; i <= 20; ++i){
     workers.emplace_back(
       std::thread([](){
         for(int j = 0; j <= 20; ++j){
           // can also use ++total same effect
           // Memory order does not matter on x86
           total.fetch_add(1, std::memory_order_relaxed);
         }
       }));
     }

    std::for_each(
       workers.begin(),
       workers.end(),
       [](auto & w){
         if(w.joinable()){
           w.join();
         }
       }
     );

  std::cout << total << std::endl;
  return 0;
}

Using non-atomic variables with mutexes to prevent data races

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>

static int total = 0;
static int worker_count = 20;
std::mutex mtx;

int main(){
  std::vector<std::thread> workers;
  workers.reserve(worker_count);
  for(int i = 0; i < worker_count; ++i){
    workers.emplace_back(std::thread([](){
    for(int j = 0; j < worker_count; ++j){ 
      std::lock_guard<std::mutex> lkg(mtx);
      ++total;
     }
   })
   );
 }

for(auto & worker: workers){
   if (worker.joinable()){
   worker.join();
  }
 }

std::cout << total << std::endl;
 return 0;
}

DCLP – Double Checked Linked Pattern

As per the paper by Scott Meyers and Andrei Alexandrescu, the DCLP pattern implementation is given below.

In order to get instance of a singleton class, there are two checks, first check without the lock, then the lock is acquired and then again checked if the instance exists.

Singleton* Singleton::instance() { 
if (pInstance == 0) { // 1st test 
    Lock lock; // acquire the lock 
    if (pInstance == 0) { // 2nd test 
      pInstance = new Singleton; 
    } 
  } 
  return pInstance; 
}

In order to simplify this situation, c++ standard came with call_once and once_flag.

std::once_flag pInstance_initialized; // Flag
std::call_once(pInstance_initialized, {pInstance =new Singleton;});

Here is an example code called once as per the once flag:

#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
#include <atomic>

int worker_count = 20;

int main(){
  std::vector<std::thread> workers;
  workers.reserve(worker_count);
  std::once_flag is_called;
  workers.reserve(worker_count);
  for(int i = 0; i < worker_count; ++i){
    workers.emplace_back(std::thread([&](){
      std::call_once(is_called, [](){
      std::cout << "once_flag is set in the thread id " << std::this_thread::get_id() << std::endl;
    });
   })
   );
  }

  for(auto & worker: workers){
   if (worker.joinable()){
     worker.join();
   }
  }

  return 0;
}

Condition Variables: used for intercommunication between the threads. Condition Variables work with mutex as a way of synchronization between threads.

Following is a classic example of printing number series like 0102030405… using 3 threads. One thread prints 0, another prints all odd numbers and 3rd thread prints even numbers. Condition variables are used to solve this problem.

#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>

std::condition_variable cv;
std::mutex mtx;
int n = 1;
int max = 100;
bool digit_printed = false;

int main(){
  std::thread t_zero([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == true;
    });
    std::cout << 0<< "\n";
    digit_printed = false;
    cv.notify_all();
   }
 });

std::thread t_odd([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == false && n%2 == 1;
    });
    std::cout << n++ << "\n";
    digit_printed = true;
    cv.notify_all();
   }
 });

std::thread t_even([](){
  while(n < max){
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, [](){
      return digit_printed == false && n%2 == 0;
    });
    std::cout << n++ << "\n";
    digit_printed = true;
    cv.notify_all();
   }
 });
 
 t_zero.join();
 t_odd.join();
 t_even.join();
 return 0;
}

 

c++ Multithreading Theory and Concepts

Concurrency vs parallelism

Concurrency is when two or more tasks can start, run, and complete in overlapping time periods. It doesn’t necessarily mean they’ll ever both be running at the same instant. For example, multitasking on a single-core machine.

Parallelism is when tasks literally run at the same time, e.g., on a multicore processor.

following image explains the difference correctly.

1__4B2PKsJn9pUz3jbTnBnYw
image credit: https://medium.com/@deepshig/concurrency-vs-parallelism-4a99abe9efb8

Why Threading

  • Forking processes has lot of overhead like creating the data structures for processes.
  • Interprocess communication is also complex. Because of overhead, its not appropriate for latency sensitive applications.
  • Threading makes parallelism easy with simplified coding API.
  • Threading makes data sharing simple.

Threading in Linux

Linux represents threads as lightweight processes. POSIX threads library provides API for thread management using the header pthread.h. A simple program to create threads is shown as

#include <pthread.h>
#include <iostream>

void * work(void * void_ptr)
{
  std::cout << "in the work function" << std::endl;
  return NULL;
}

int main(){
  pthread_t thread_id;
  pthread_create(&thread_id, NULL, work, NULL);
  pthread_join(thread_id, NULL);
  return 0;
}

Threading using Modern c++

#include <thread>
#include <iostream>

void work(){
 std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main(){
 std::thread t(work);
 t.join();
 return 0;
}

Modern c++ 11 introduced following in regards to multi-threading

  • std::thread and related classes
  • memory model: this defines concepts such as memory location, threads, synchronization primitives and most importantly the memory order.
    • memory order: memory order means how access to memory is managed for atomic and non-atomic variables and operations. The memory order guarantees the memory read or write operations will be done in the order specified. There are following types of memory orders.
      • memory_order_relaxed
      • memory_order_consume
      • memory_order_acquire
      • memory_order_release
      • memory_order_acq_rel
      • memory_order_seq_cst: this is default memory order if no memory order is specified,

Why memory ordering

The program written in c/c++ gets compiled to assembly. The compiler reorders the instructions are all the times for optimization. In a single threaded program, if  load or store instruction reordered for a variable, there is generally no problem because the effects of reordering is observed at the end of program. But in a multi-threaded program if load or store instructions for a shared variable get reordered, then the threads may observe the shared variable in inconsistent state.

Screen Shot 2018-04-08 at 11.43.49 PM
The instruction reordering as per Herb Sutter’s talk: https://www.youtube.com/watch?v=KeLBd2EJLOU

Here is an excellent talk on memory reordering:

Reordering can be achieved using fences. Fences can be Compiler fences or hardware fences. Memory order types in c++ memory model:

  • memory_order_relaxed: instructions can be ordered in sequence, there is no strict ordering applied.
  • memory_order_acquire: A load operation guarantees that the instructions after acquire operations happen after the load happens i.e. the operations will NOT be moved before current load happens.
  • memory_order_release: The store operation guarantees that the instructions happening before release operations happen before store happens, i.e. the instructions will not be moved after the store.
  • memory_order_acq_rel: This combines effects of both memory_order_acquire and memory_order_release. This provides the read/write ordering relative to atomic variables.
  • memory_order_seq_cst: This is most strictest of all the memory ordering.  This provides read/write ordering relative to all variables including non-acomic variables too.

Here are couple more interesting talks on this subject

 

Hardware Fences/Barriers in x86

x86 ISA provides strictest memory ordering guarantees.

There are 3 types of fences:

  • SFENCE — Serializes all store (write) operations that occurred prior to the SFENCE instruction in the program instruction stream, but does not affect load operations.

  • LFENCE — Serializes all load (read) operations that occurred prior to the LFENCE instruction in the program instruction stream, but does not affect store operations.2

  • MFENCE — Serializes all store and load operations that occurred prior to the MFENCE instruction in the program instruction stream.

The memory ordering in x86

  • Reads are not reordered with other reads.
  • Writes are not reordered with older reads
  • Reads may be reordered with older writes to different locations but not with older writes to the same location.

  • Reads or writes cannot be reordered with I/O instructions, locked instructions, or serializing instructions.

  •  Reads cannot pass earlier LFENCE and MFENCE instructions.

  •  Writes and executions of CLFLUSH and CLFLUSHOPT cannot pass earlier LFENCE, SFENCE, and MFENCE instructions.

  • LFENCE instructions cannot pass earlier reads. 

  • SFENCE instructions cannot pass earlier writes or executions of CLFLUSH and CLFLUSHOPT.

  • MFENCE instructions cannot pass earlier reads, writes, or executions of CLFLUSH and CLFLUSHOPT.

In a multiple-processor system, the following ordering principles apply:

  • Individual processors use the same ordering principles as in a single-processor system.
  • Writes by a single processor are observed in the same order by all processors.
  • Writes from an individual processor are NOT ordered with respect to the writes from other processors.
  • Memory ordering obeys causality (memory ordering respects transitive visibility).
  • Any two stores are seen in a consistent order by processors other than those performing the stores
  • Locked instructions have a total order.

 

Issues in Multithreading

  • Data sharing can be complicated
  • Race conditions can be difficult to debug
  • Dead locks
  • Live locks

Modern c++ std::vector algorithms

dhruv-deshmukh-266273-unsplash

Find unique numbers in the vector if there is ONLY one element that is unique

#include <vector>
#include <accumulate>

std::vector <int> v {1,2,1,1,1,1,1,1,1,1,1};

int u = std::accumulate(v.begin(), v.end(), (uint32_t)0, [](uint32_t a, uint32_t b) { return a ^ b; });
// The reason this works is because xoring the number with itself is 0 and xroing number with 0 is itself.
>> 2

Various other algorithms based on vector

  • segregate positive and negative numbers in the vector
  • find max in array
  • find minimum index in a rotated array
  • plain vanilla binary search
  • find maximum in array increases first then decreases
  • search in bitonic array i.e. increasing and then decreasing array.
#include <iostream>
#include <vector>
#include <random>
#include <functional>
#include <numeric>
#include <algorithm>

using it = std::vector<int>::iterator;
// segregate positive and negative numbers in the vector
// the negative numbers should be at the start and positive
// numbers should be at the end
void segregate(it start, it finish){
  while(true){
    for(;*start < 0 && start < finish; start++){}
    for(;*finish > 0 && start < finish; finish--){}
    if(start < finish){
      std::iter_swap(start, finish);  
    }else{
      break;
    }
  }
}

// find max in an array
int find_max(it start, it finish, int max_num){
  if(std::next(start) == finish) return std::max(*start, *finish);
  it mid = start;
  std::advance(mid, std::distance(start,finish)/2);
  int max_left = find_max(start, mid, *start);
  int max_right = find_max(mid, finish, *mid);
  return std::max(max_left, max_right);
}

// find max sum subarray of size = k
int max_sum_k(it start, it finish, int k){
  int max_so_far = 0, sum_of_k = 0;
  for(;start < finish && start+k < finish;start++){
    sum_of_k = std::accumulate(start,start+k, 0,std::plus<int>());
    max_so_far = std::max(sum_of_k, max_so_far);
  }
  return max_so_far;
}

// find minimum index in a rotated array
int min_idx_rotated_ary(it start, it finish){
  it orig_start = start;
  if(*start < *std::prev(finish)) return 0;
  while(start < finish){
    it mid = start;
    std::advance(mid, std::distance(start, finish)/2);
    if (mid < finish && *mid > *std::next(mid)) {
      return std::distance(orig_start, mid + 1);
    }else if (*start <= *mid){
      start = std::next(mid);
    }else{
      finish =  std::prev(mid);
    }
  }
  return 0;
}

// plain vanilla binary search
int binary_search(it start, it finish, int num){
  int idx = -1;
  it orig_start = start;
  if(start > finish) return -1;

  while(start <= finish){
    it mid = start;
    std::advance(mid, std::distance(start, finish)/2);
    if(*mid == num){
      idx = std::distance(orig_start, mid);
      break;
    }else if(num < *mid){
      finish = std::prev(mid);
    }else{
      start = std::next(mid);
    }
  }
  return idx;
}

// search an element using binary search in a rotated array
int bin_search_sorted_rotated_ary(it start, it finish, int num){
  it orig_start = start;
  int idx = -1;
  while(start < finish){
    it mid = start;
    std::advance(mid, std::distance(start, finish)/2);
    
    if(*mid == num){
      idx = std::distance(orig_start, mid);
    }
   
    if(*start <= *mid){
      if(*start <= num && num <= *mid){
        finish = std::prev(mid);
      }else{
        start = std::next(mid);
      }
    }else{
       if(*mid <= num && num <= *finish){
        start = std::next(mid);
      }else{
        finish = std::prev(mid);
      }
    }
  }
  return idx;
}

// recursively find maximum in array increases first then decreases
int max_increasing_decreasing_ary(it start, it finish){
  if(start == finish){
    return *start;
  }

  if(std::next(start) == finish){
    return std::max(*start, *finish);
  }
  
  it mid = start;
  std::advance(mid, std::distance(start, finish)/2);

  if(*mid > *std::prev(mid) && *mid > *std::next(mid)){
    return *mid;
  }
  
  if(*mid > *std::next(mid) && *mid < *std::prev(mid)){
    return max_increasing_decreasing_ary(start, std::prev(mid));
   }else{
   return max_increasing_decreasing_ary(std::next(mid), finish);
   }
}

// {2,4,6,8,10,12,14,40,11,7,5,3,1}
// search = 40
int search_bitonic_ary(it start, it finish, int num){
  int max = max_increasing_decreasing_ary(start, finish);
  ptrdiff_t pos = std::distance(start, std::find(start, finish, max));
  int idx = -1;
  it orig_start = start;
  if(max == -1){
    return -1;
  }

  idx = binary_search(start, start+pos, num);
  if(idx == -1){
    start = start+pos;
    while(start <= finish){
      it mid = start;
      std::advance(mid, std::distance(start, finish)/2);
      if(*mid == num){
        idx = std::distance(orig_start, mid);
        break;
      }else if(num > *mid){
        finish = std::prev(mid);
      }else{
        start = std::next(mid);
      }
    }
  }
  return idx;
}

int main(){
  std::vector<int> numbers {-1, 3, 8, -4, 5, -6, 7, -20, 30, -40};
  segregate(numbers.begin(), numbers.end());
  for(const int & i: numbers) std::cout << i <<  ' ';
  std::cout << '\n';


  std::random_device rd;  //Will be used to obtain a seed for the random number engine
  std::mt19937 gen(rd()); //Standard mersenne_twister_engine seeded with rd()
  std::uniform_int_distribution<> dis(1, 200);

  std::vector<int> nums_find_max {8, 4, 2, 1};
  for (int n=0; n<20; ++n){
      int num = dis(gen);
      std::cout << num << ' ';
      nums_find_max[n] = num;
  }

  std::cout << '\n';
  std::cout << "Max number is : " << find_max(nums_find_max.begin(), 
                                              nums_find_max.end(), 
                                              *nums_find_max.begin()) << '\n';


  std::vector<int> v_max_k{11, -8, 16, -7, 24, -2, 3};
  std::cout << max_sum_k(v_max_k.begin(), v_max_k.end(), 3) << '\n';  // 33

  std::vector<int> rotated{65, 70, 80, 90, 10, 20, 30, 40, 50, 60};
  std::cout << min_idx_rotated_ary(rotated.begin(), rotated.end()) << '\n';

  std::vector<int> bin_search{10, 20, 30, 40, 50, 60, 65, 70, 80, 90};
  std::cout << binary_search(bin_search.begin(), bin_search.end(), 70) << '\n';
  std::cout << binary_search(bin_search.begin(), bin_search.end(), 100) << '\n';

  std::cout << bin_search_sorted_rotated_ary(rotated.begin(), rotated.end(), 80) << '\n';
  std::cout << bin_search_sorted_rotated_ary(rotated.begin(), rotated.end(), 65) << '\n';
  std::cout << bin_search_sorted_rotated_ary(rotated.begin(), rotated.end(), 30) << '\n';

  std::vector<int> increasing_decreasing_ary{2,4,6,8,10,12,14,40,11,7,5,3,1};
  std::cout << "max in increasing and then decreasing ary :" << max_increasing_decreasing_ary(increasing_decreasing_ary.begin(),
    increasing_decreasing_ary.end()) << '\n';

  std::cout << "index of 40 in bitonic_ary "<<search_bitonic_ary(increasing_decreasing_ary.begin(), 
                                  increasing_decreasing_ary.end(),
                                  40) << '\n';


  std::cout << "index of 7 in bitonic_ary " << search_bitonic_ary(increasing_decreasing_ary.begin(), 
                                  increasing_decreasing_ary.end(),
                                  7) << '\n';

}

Print matrix diagonally

#include <iostream>
#include <vector>

int main(){
  std::vector<std::vector<int>> v(8, std::vector<int>(8, 0));
  for(int i = 0; i < 8; i++){
    for(int j = 0; j < 8; j++){
      v[i][j]= (i+1)*(j+1);
    }
  }


// printing the matrix
  for(int i = 0; i < 8; i++){
    for(int j = 0; j < 8; j++){
      std::cout << v[i][j] << '\t';
    }
    std::cout << '\n';
  }

  int row, col;
  int rowCount = v.size();
  int columnCount = v[0].size();

  for(int k = 0; k < rowCount; k++){
    for(row = k, col = 0; row >= 0 && col <= columnCount; row --, col++){
      std::cout << v[row][col] << '\t';
    }
    std::cout << '\n';
  }

  for(int k = 0; k < rowCount; k++){
    for(row = rowCount -1, col = k; row >= 0 && col <= columnCount; row --, col++){
      std::cout << v[row][col] << '\t';
    }
    std::cout << '\n';
  }

return 0;
}

Matrix:

1	2	3	4	5	6	7	8
2	4	6	8	10	12	14	16
3	6	9	12	15	18	21	24
4	8	12	16	20	24	28	32
5	10	15	20	25	30	35	40
6	12	18	24	30	36	42	48
7	14	21	28	35	42	49	56
8	16	24	32	40	48	56	64

Expected Output:

1
2	2
3	4	3
4	6	6	4
5	8	9	8	5
6	10	12	12	10	6
7	12	15	16	15	12	7
8	14	18	20	20	18	14	8
8	14	18	20	20	18	14	8
16	21	24	25	24	21	16	0
24	28	30	30	28	24	3
32	35	36	35	32	4
40	42	42	40	5
48	49	48	6
56	56	7
64	8

C++ Operator Overloading

I recently came across a tricky situation in operator overloading. The idea is overload ++ operator for a class. Since ++ works as both prefix and postfix.

int x = 0;
 auto y = ++x; // This is prefix 
 std::cout << "x: "<< x++ << "\n"; // this is postfix and value of 
// x is changed after the execution of this statement x = 1
 std::cout << "y: " << y << "\n"; // y = 1

When it comes to overloading of ++ operator, things becomes little tricky. Following is the way to overload ++ operator for both prefix and postfix.

struct X{
  int i = 0;
  // The prefix is coded as ++x
  X & operator++(){
    ++i;
    return *this;
  }
  
  // this is postfix as x++
  X operator ++(int){
    X y(*this); 
    ++y; 
    return y;
  }
};

the prefix solution is straight forward but the postfix returns a copy of instance of X and hence its first copied, prefix increment is done and return the instance.

When overloading another operator << following code can be used.

struct X{
  friend std::iostream& operator<<(std::iostream & os, const X & x){
    os << "x.i: "<< x.i << "\n";
    return os;
  }
};

 

 

 

Anatomy of an Assembly Program

Here is a very simple c/c++ program and its assembly program compiled using flag -S.
There are no headers included.

int main(){
  return 0;
}

Following is the output generated.

This output shows an anatomy of an

; represents sections in the program like .bss section, .text section etc.
 .file "simple.cpp"
 .text
 .globl main
 .type main, @function
; entry to main function
main:
; push the current value of each should be pushed onto the stack to be restored at the end
 pushq %rbp
; move the contents of stack pointer to rbp register.
 movq %rsp, %rbp
; push the exit code 0 to the eax register.
 movl $0, %eax
; pop the contents of rbp register.
 popq %rbp
; return from the program.
 ret