Today I'm going to solve a most common problem which is known as the consumer producer problem. In this context we have a shared buffer which the producers produce and the consumers consume. The consumers and producers are threads which will simultaneously produce and consume. There are some conditions to be met where consumers have to wait until producers produce, and another thing is that when the buffer is full producers must halt until the consumers consume.
In the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer.
ProducerConsumer.cpp DOWNLOAD SOURCEIn the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer.
#include <iostream> #include <pthread.h> #include <vector> #include <cstdlib> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <semaphore.h> #include <string.h> #define BUFFER_SIZE 10 void InitializeData(); void *Produce(void *); void *Consume(void *); int InsertItem(int); int RemoveItem(int *); int iCounter; pthread_mutex_t mutex; sem_t full, empty; int buffer[BUFFER_SIZE]; int main(int argc , char * argv[]) { InitializeData(); pthread_t ProducerThread , ConsumerThread; int *aa = new int [10]; for(int i = 0 ; i < 10 ; i++) { aa[i] = i; pthread_t t; pthread_create(&t , NULL, Produce , &aa[i]); printf("Creating Producer %d \n", i); } int *bb = new int[10]; for(int i = 0 ; i < 10 ; i++) { bb[i] = i; pthread_t t; pthread_create(&t , NULL, Consume , &bb[i]); printf("Creating Consumer %d \n", i); } sleep(5); delete [] aa; delete [] bb; return 0; } //**************************************************************************************************** void InitializeData() { pthread_mutex_init(&mutex , NULL); sem_init(&full , 0 ,0); sem_init(&empty , 0 , BUFFER_SIZE); iCounter = 0; } //**************************************************************************************************** void * Produce(void * Param) { int item; while(1) { //sleep(1); item = rand() % 100; sem_wait(&empty); pthread_mutex_lock(&mutex); int iMsg = InsertItem(item); if(iMsg == -1){ printf("Error Inserting Item \n"); }else { printf("Produced Item :: %d Thread No :: %d\n", item , *((int *)Param)); } pthread_mutex_unlock(&mutex); sem_post(&full); } } //**************************************************************************************************** void * Consume(void * Param) { int item; while(1) { //sleep(1); sem_wait(&full); pthread_mutex_lock(&mutex); int iMsg = RemoveItem(&item); if(iMsg == -1){ printf("Error Removing Item \n"); }else { printf("Consumed Item :: %d Thread No :: %d \n", item ,*((int *)Param)); } pthread_mutex_unlock(&mutex); sem_post(&empty); } } //**************************************************************************************************** int InsertItem(int item) { if(iCounter < BUFFER_SIZE) { buffer[iCounter] = item; iCounter++; return 1; } else{ return -1; } } //**************************************************************************************************** int RemoveItem(int *item) { if(iCounter > 0) { *item = buffer[iCounter - 1]; iCounter--; return 1; } else{ return -1; } }
That's it folks. Hope to see you soon in another exciting tutorial.