Today I'm going to solve a most common problem which is known as the consumer producer problem. In this context we have a shared buffer which the producers produce and the consumers consume. The consumers and producers are threads which will simultaneously produce and consume. There are some conditions to be met where consumers have to wait until producers produce, and another thing is that when the buffer is full producers must halt until the consumers consume.
In the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer.
ProducerConsumer.cpp DOWNLOAD SOURCEIn the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer.
#include <iostream> #include <pthread.h> #include <vector> #include <cstdlib> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <semaphore.h> #include <string.h> #define BUFFER_SIZE 10 void InitializeData(); void *Produce(void *); void *Consume(void *); int InsertItem(int); int RemoveItem(int *); int iCounter; pthread_mutex_t mutex; sem_t full, empty; int buffer[BUFFER_SIZE]; int main(int argc , char * argv[]) { InitializeData(); pthread_t ProducerThread , ConsumerThread; int *aa = new int [10]; for(int i = 0 ; i < 10 ; i++) { aa[i] = i; pthread_t t; pthread_create(&t , NULL, Produce , &aa[i]); printf("Creating Producer %d \n", i); } int *bb = new int[10]; for(int i = 0 ; i < 10 ; i++) { bb[i] = i; pthread_t t; pthread_create(&t , NULL, Consume , &bb[i]); printf("Creating Consumer %d \n", i); } sleep(5); delete [] aa; delete [] bb; return 0; } //**************************************************************************************************** void InitializeData() { pthread_mutex_init(&mutex , NULL); sem_init(&full , 0 ,0); sem_init(&empty , 0 , BUFFER_SIZE); iCounter = 0; } //**************************************************************************************************** void * Produce(void * Param) { int item; while(1) { //sleep(1); item = rand() % 100; sem_wait(&empty); pthread_mutex_lock(&mutex); int iMsg = InsertItem(item); if(iMsg == -1){ printf("Error Inserting Item \n"); }else { printf("Produced Item :: %d Thread No :: %d\n", item , *((int *)Param)); } pthread_mutex_unlock(&mutex); sem_post(&full); } } //**************************************************************************************************** void * Consume(void * Param) { int item; while(1) { //sleep(1); sem_wait(&full); pthread_mutex_lock(&mutex); int iMsg = RemoveItem(&item); if(iMsg == -1){ printf("Error Removing Item \n"); }else { printf("Consumed Item :: %d Thread No :: %d \n", item ,*((int *)Param)); } pthread_mutex_unlock(&mutex); sem_post(&empty); } } //**************************************************************************************************** int InsertItem(int item) { if(iCounter < BUFFER_SIZE) { buffer[iCounter] = item; iCounter++; return 1; } else{ return -1; } } //**************************************************************************************************** int RemoveItem(int *item) { if(iCounter > 0) { *item = buffer[iCounter - 1]; iCounter--; return 1; } else{ return -1; } }
That's it folks. Hope to see you soon in another exciting tutorial.
Hey. How can you rewrite producer-consumer problem to take the number N as command line argument and the result is that it is producing any N and N consumere (all of which are still working towards the same buffer).
ReplyDeleteRegards
It's not so clear what you are asking. What do you mean by N?..
ReplyDeleteNormaly there is one producer and one consumer, but i want to generalize this the problem to have many producers and many consumers to work towards the same buffer. I hope i made it clear to you ....
ReplyDelete#include
ReplyDelete#include
#include /* usleep */
#include
#include
#define SHARED 0 /* process-sharing if !=0, thread-sharing if =0*/
#define BUF_SIZE 20
#define MAX_MOD 100000
#define NUM_ITER 1000
void *Producer(void *); /* Producer thread */
void *Consumer(void *); /* Consumer thread */
sem_t empty; /* empty: How many empty buffer slots */
sem_t full; /* full: How many full buffer slots */
sem_t b; /* b: binary, used as a mutex */
int g_data[BUF_SIZE]; /* shared finite buffer */
int main(void) {
pthread_t pid, cid;
sem_init(&empty, SHARED, BUF_SIZE);
sem_init(&full, SHARED, 0);
sem_init(&b, SHARED, 1);
printf("main started\n");
pthread_create(&pid, NULL, Producer, NULL);
pthread_create(&cid, NULL, Consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
printf("main done\n");
return 0;
}
void *Producer(void *arg) {
int semvalem,i=0,j; /* semvalem: semaphore value of empty semaphore */
while(i < NUM_ITER) {
usleep(rand()%MAX_MOD); /* pretend to generate an item by a random wait*/
sem_wait(&empty);
sem_wait(&b);
sem_getvalue(&empty, &semvalem);
g_data[BUF_SIZE-(semvalem-1)]=1; /* put item in buffer */
/* the following two lines just prints a bar showing current buffer fill */
j=BUF_SIZE; printf("(Producer, semaphore empty is %d) \t",semvalem);
while(j > semvalem) { j--; printf("="); } printf("\n");
sem_post(&b);
sem_post(&full);
i++;
}
return 0;
}
void *Consumer(void *arg) {
int semvalfu,i=0,j; /* semvalfu: semaphore value of full semaphore */
while(i < NUM_ITER) {
usleep(rand()%MAX_MOD); /* pretend to consume an item by a random wait*/
sem_wait(&full);
sem_wait(&b);
sem_getvalue(&full, &semvalfu);
g_data[semvalfu]=0; /* remove item from buffer */
/* the following two lines just prints a bar showing current buffer fill */
j=0; printf("(Consumer, semaphore full is %d) \t",semvalfu);
while(j < semvalfu) { j++; printf("="); } printf("\n");
sem_post(&b);
sem_post(&empty);
i++;
}
return 0;
}
This is the one i have to rewrite.
I think the answer is straight forward. In the main(). I created 10 consumer threads and 10 producer threads.
ReplyDelete//creates 10 producers
for(int i = 0 ; i < 10 ; i++)
{
aa[i] = i;
pthread_t t;
pthread_create(&t , NULL, Produce , &aa[i]);
printf("Creating Producer %d \n", i);
}
//creates 10 consumers
for(int i = 0 ; i < 10 ; i++)
{
bb[i] = i;
pthread_t t;
pthread_create(&t , NULL, Consume , &bb[i]);
printf("Creating Consumer %d \n", i);
}
So change your code accordingly other than using a single producer and consumer.
when compiling sleep is shown as undeclared wht to do???
ReplyDeletehello, i am compiling in windows 7 32 bit. but it is saying pthread_lock_mutex is unidentified but i already copied the necessary files to my visual studio directory. is it possible to compile in windows 7?
ReplyDeleteWhat if each consumer-producer pair process different data ..
ReplyDeleteHow can we ensure that a given data produced by a producer is consumed by its paired consumer ??
hello, can you please explain the meaning of these lines of code?
ReplyDelete"sleep(5);
delete [] aa;
delete [] bb;"
This comment has been removed by the author.
ReplyDeletehello,could you solve this problem?
ReplyDeleteA car is manufactured at each stop on a conveyor belt in a car factory. A car is constructed from the following parts - chassis, tires, seats, engine (assume this includes everything under the hood and the steering wheel), the top cover, and painting. Thus there are 6 tasks in manufacturing a car. However, tires,seats or the engine cannot be added until the chassis is placed on the belt. The car top cannot be added until tires, seats and the engine are put in. Finally, the car cannot be painted until the top is put on. A stop on the conveyor belt in your car company has four technicians assigned to it - Abe, Bob, Charlie, and Dave. Abe is skilled at adding tires and painting, Bob can only put the chassis on the belt, Charlie only knows how to attach the seats, and Dave knows how to add the engine as well as how to add the top. using threads to represent abe,bob,charlie and dave, how can this problem be solved using semaphores?