Pages

Subscribe:

Ads 468x60px

Wednesday, January 23, 2013

Producer Consumer Problem in C++

Today I'm going to solve a most common problem which is known as the consumer producer problem. In this context we have a shared buffer which the producers produce and the consumers consume. The consumers and producers are threads which will simultaneously produce and consume. There are some conditions to be met where consumers have to wait until producers produce, and another thing is that when the buffer is full producers must halt until the consumers consume.

In the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer. 
ProducerConsumer.cpp     DOWNLOAD SOURCE

#include <iostream>
#include <pthread.h>
#include <vector>
#include <cstdlib>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>



#define BUFFER_SIZE 10

void InitializeData();
void *Produce(void *);
void *Consume(void *);
int InsertItem(int);
int RemoveItem(int *);

int iCounter;
pthread_mutex_t mutex;
sem_t full, empty;

int buffer[BUFFER_SIZE];



int main(int argc , char * argv[])
{
 InitializeData(); 
 pthread_t ProducerThread , ConsumerThread;
 int *aa = new int [10];
 for(int i = 0 ; i < 10 ; i++)
 {
  aa[i] = i;
  pthread_t t;
  pthread_create(&t , NULL, Produce , &aa[i]);
  printf("Creating Producer %d \n", i);
 }
 int *bb = new int[10];
 for(int i = 0 ; i < 10 ; i++)
 {
  bb[i] = i;
  pthread_t t;
  pthread_create(&t , NULL, Consume , &bb[i]);
  printf("Creating Consumer %d \n", i);
 }
 
 sleep(5);
 delete [] aa;
 delete [] bb;
 

 return 0;
}

//****************************************************************************************************
void InitializeData()
{
 pthread_mutex_init(&mutex , NULL);
 sem_init(&full , 0 ,0);
 sem_init(&empty , 0 , BUFFER_SIZE);
 
 iCounter = 0;
 

}

//****************************************************************************************************
void * Produce(void * Param)
{
 int item;
 
 while(1)
 {
  //sleep(1);
  item = rand() % 100;
  sem_wait(&empty);
  pthread_mutex_lock(&mutex);
  int iMsg = InsertItem(item);
  
  if(iMsg == -1){
   printf("Error Inserting Item \n");
  }else
  {
   printf("Produced Item :: %d  Thread No :: %d\n", item , *((int *)Param));
  }
  pthread_mutex_unlock(&mutex);
  sem_post(&full);
  
 }
}

//****************************************************************************************************
void * Consume(void * Param)
{
 int item;
 
 while(1)
 {
  //sleep(1);
  sem_wait(&full);
  pthread_mutex_lock(&mutex);
  int iMsg = RemoveItem(&item);
  
  if(iMsg == -1){
   printf("Error Removing Item \n");
  }else
  {
   printf("Consumed Item :: %d  Thread No :: %d \n", item ,*((int *)Param));
  }
  pthread_mutex_unlock(&mutex);
  sem_post(&empty);
  
 }
}

//****************************************************************************************************
int InsertItem(int item)
{
 if(iCounter < BUFFER_SIZE)
 {
  buffer[iCounter] =  item;
  iCounter++;
  return 1;
 }
 else{
  return -1;
 }

}

//****************************************************************************************************
int RemoveItem(int *item)
{
 if(iCounter > 0)
 {
  *item = buffer[iCounter - 1];
  iCounter--;
  return 1;
 }
 else{
  return -1;
 }
}

That's it folks. Hope to see you soon in another exciting tutorial.

6 comments:

  1. Hey. How can you rewrite producer-consumer problem to take the number N as command line argument and the result is that it is producing any N and N consumere (all of which are still working towards the same buffer).

    Regards

    ReplyDelete
  2. It's not so clear what you are asking. What do you mean by N?..

    ReplyDelete
  3. Normaly there is one producer and one consumer, but i want to generalize this the problem to have many producers and many consumers to work towards the same buffer. I hope i made it clear to you ....

    ReplyDelete
  4. #include
    #include
    #include /* usleep */
    #include
    #include
    #define SHARED 0 /* process-sharing if !=0, thread-sharing if =0*/
    #define BUF_SIZE 20
    #define MAX_MOD 100000
    #define NUM_ITER 1000
    void *Producer(void *); /* Producer thread */
    void *Consumer(void *); /* Consumer thread */
    sem_t empty; /* empty: How many empty buffer slots */
    sem_t full; /* full: How many full buffer slots */
    sem_t b; /* b: binary, used as a mutex */
    int g_data[BUF_SIZE]; /* shared finite buffer */
    int main(void) {
    pthread_t pid, cid;
    sem_init(&empty, SHARED, BUF_SIZE);
    sem_init(&full, SHARED, 0);
    sem_init(&b, SHARED, 1);
    printf("main started\n");
    pthread_create(&pid, NULL, Producer, NULL);
    pthread_create(&cid, NULL, Consumer, NULL);
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);
    printf("main done\n");
    return 0;
    }
    void *Producer(void *arg) {
    int semvalem,i=0,j; /* semvalem: semaphore value of empty semaphore */
    while(i < NUM_ITER) {
    usleep(rand()%MAX_MOD); /* pretend to generate an item by a random wait*/
    sem_wait(&empty);
    sem_wait(&b);
    sem_getvalue(&empty, &semvalem);
    g_data[BUF_SIZE-(semvalem-1)]=1; /* put item in buffer */
    /* the following two lines just prints a bar showing current buffer fill */
    j=BUF_SIZE; printf("(Producer, semaphore empty is %d) \t",semvalem);
    while(j > semvalem) { j--; printf("="); } printf("\n");
    sem_post(&b);
    sem_post(&full);
    i++;
    }
    return 0;
    }
    void *Consumer(void *arg) {
    int semvalfu,i=0,j; /* semvalfu: semaphore value of full semaphore */
    while(i < NUM_ITER) {
    usleep(rand()%MAX_MOD); /* pretend to consume an item by a random wait*/
    sem_wait(&full);
    sem_wait(&b);
    sem_getvalue(&full, &semvalfu);
    g_data[semvalfu]=0; /* remove item from buffer */
    /* the following two lines just prints a bar showing current buffer fill */
    j=0; printf("(Consumer, semaphore full is %d) \t",semvalfu);
    while(j < semvalfu) { j++; printf("="); } printf("\n");
    sem_post(&b);
    sem_post(&empty);
    i++;
    }
    return 0;
    }

    This is the one i have to rewrite.

    ReplyDelete
  5. I think the answer is straight forward. In the main(). I created 10 consumer threads and 10 producer threads.

    //creates 10 producers
    for(int i = 0 ; i < 10 ; i++)
    {
    aa[i] = i;
    pthread_t t;
    pthread_create(&t , NULL, Produce , &aa[i]);
    printf("Creating Producer %d \n", i);
    }
    //creates 10 consumers
    for(int i = 0 ; i < 10 ; i++)
    {
    bb[i] = i;
    pthread_t t;
    pthread_create(&t , NULL, Consume , &bb[i]);
    printf("Creating Consumer %d \n", i);
    }

    So change your code accordingly other than using a single producer and consumer.

    ReplyDelete
  6. when compiling sleep is shown as undeclared wht to do???

    ReplyDelete