Pages

Subscribe:

Ads 468x60px

Wednesday, January 23, 2013

Producer Consumer Problem in C++

Today I'm going to solve a most common problem which is known as the consumer producer problem. In this context we have a shared buffer which the producers produce and the consumers consume. The consumers and producers are threads which will simultaneously produce and consume. There are some conditions to be met where consumers have to wait until producers produce, and another thing is that when the buffer is full producers must halt until the consumers consume.

In the following example was implemented using pthreads. The producer produce a random number to the buffer and consumers consume that number. Mutex locks are used to protect the shared buffer. 
ProducerConsumer.cpp     DOWNLOAD SOURCE

#include <iostream>
#include <pthread.h>
#include <vector>
#include <cstdlib>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>



#define BUFFER_SIZE 10

void InitializeData();
void *Produce(void *);
void *Consume(void *);
int InsertItem(int);
int RemoveItem(int *);

int iCounter;
pthread_mutex_t mutex;
sem_t full, empty;

int buffer[BUFFER_SIZE];



int main(int argc , char * argv[])
{
 InitializeData(); 
 pthread_t ProducerThread , ConsumerThread;
 int *aa = new int [10];
 for(int i = 0 ; i < 10 ; i++)
 {
  aa[i] = i;
  pthread_t t;
  pthread_create(&t , NULL, Produce , &aa[i]);
  printf("Creating Producer %d \n", i);
 }
 int *bb = new int[10];
 for(int i = 0 ; i < 10 ; i++)
 {
  bb[i] = i;
  pthread_t t;
  pthread_create(&t , NULL, Consume , &bb[i]);
  printf("Creating Consumer %d \n", i);
 }
 
 sleep(5);
 delete [] aa;
 delete [] bb;
 

 return 0;
}

//****************************************************************************************************
void InitializeData()
{
 pthread_mutex_init(&mutex , NULL);
 sem_init(&full , 0 ,0);
 sem_init(&empty , 0 , BUFFER_SIZE);
 
 iCounter = 0;
 

}

//****************************************************************************************************
void * Produce(void * Param)
{
 int item;
 
 while(1)
 {
  //sleep(1);
  item = rand() % 100;
  sem_wait(&empty);
  pthread_mutex_lock(&mutex);
  int iMsg = InsertItem(item);
  
  if(iMsg == -1){
   printf("Error Inserting Item \n");
  }else
  {
   printf("Produced Item :: %d  Thread No :: %d\n", item , *((int *)Param));
  }
  pthread_mutex_unlock(&mutex);
  sem_post(&full);
  
 }
}

//****************************************************************************************************
void * Consume(void * Param)
{
 int item;
 
 while(1)
 {
  //sleep(1);
  sem_wait(&full);
  pthread_mutex_lock(&mutex);
  int iMsg = RemoveItem(&item);
  
  if(iMsg == -1){
   printf("Error Removing Item \n");
  }else
  {
   printf("Consumed Item :: %d  Thread No :: %d \n", item ,*((int *)Param));
  }
  pthread_mutex_unlock(&mutex);
  sem_post(&empty);
  
 }
}

//****************************************************************************************************
int InsertItem(int item)
{
 if(iCounter < BUFFER_SIZE)
 {
  buffer[iCounter] =  item;
  iCounter++;
  return 1;
 }
 else{
  return -1;
 }

}

//****************************************************************************************************
int RemoveItem(int *item)
{
 if(iCounter > 0)
 {
  *item = buffer[iCounter - 1];
  iCounter--;
  return 1;
 }
 else{
  return -1;
 }
}

That's it folks. Hope to see you soon in another exciting tutorial.

Friday, January 18, 2013

How to send Complex Data Structures in a Socket Program

Hi all, Hope you guys are doing well. I was busy with my work last few days. Today I'm going to write a post about sending Complex data structures using a socket program. In this post I assume that you are somewhat familiar with the socket programming basics.

In the following example we are creating a Object type in the server program and send that to the client program. This is a simple serialize and deserialize mechanism to send complex data structures through a socket program using only the STL library. If you want more complex and complete functionality you might try the boost library. 

Here we are going to send the following data structure.
 Data.h File
class InnerData{
    public:
    int c;
};

class Data{
    public:
    int a;
    char b[20];
    InnerData id;
  
};

Now we write the Server program which will create a socket, bind that socket to a port and listen for incoming connections. If there is a incoming connection accept the connection, serialize the data send to the connected client.

Server.cpp File
#include <stdio.h>
#include <cstdlib> 
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <arpa/inet.h> 
#include "Data.h"

int main(int argc , char * argv [])
{
    int SockFD , NewSockFD , PortNo, Clilen;
    char Buffer[1024];
 struct sockaddr_in Server_Addr , Cli_Addr;
 int n , pID;

 //create a socket
 SockFD =  socket(AF_INET , SOCK_STREAM , 0);
 if(SockFD < 0)
 {
  perror("Error Creating Socket \n");
  exit(1);
 }
 std::cout << "Socket Created..." << std::endl;

 bzero((char *) &Server_Addr , sizeof(Server_Addr));
 PortNo = atoi(argv[1]);
 Server_Addr.sin_family = AF_INET;
 Server_Addr.sin_addr.s_addr = INADDR_ANY;
 Server_Addr.sin_port = htons(PortNo);

 
 //bind the socket
 if(bind(SockFD , (struct sockaddr *) &Server_Addr , sizeof(Server_Addr)) < 0)
 {
  perror("Error Binding \n");
  exit(1);
 }
 std::cout << "Socket Binded..." << std::endl;
 //listen for incoming clients
 listen(SockFD , 5);
 Clilen =  sizeof(Cli_Addr);
 std::cout << "Waiting For Connections..." << std::endl;
 
 while(1)
 {
  //accept a client 
  NewSockFD = accept(SockFD , (struct sockaddr *) &Cli_Addr ,(socklen_t *) &Clilen);
  printf("New Client %d \n", NewSockFD);
  if(NewSockFD < 0)
  {
   perror("Error accepting \n");
   exit(1);
  }  
   bzero(Buffer , 1024);
   printf("New Client Connected, Process ID :: %d \n",pID);
   //create the Data
   Data *data = new Data();
   data->a = 23232;
   strcpy(data->b , "TEST MESSAGE" );
   data->id.c =1000;
  
   n =  write(NewSockFD , data ,sizeof(*data) );
   if(n < 0)
   {
    perror(" Error Sending Message \n");
    exit(1);
   }
  
 }
 return 0;
}

Now the client accept the data send by the client and deserialize it.

Client.cpp
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h> 
#include "Data.h"


int main(int argc , char * argv [])
{
 int SockFD , PortNo , n;
 struct sockaddr_in Server_Address;
 struct hostent *Server;
 
 char Buffer[1024]; // Buffer to read data

 if(argc < 3)
 {
  printf("Error hostname port required \n");
  exit(0);
 }
 PortNo = atoi(argv[2]);
 // create a socket 
 SockFD =  socket(AF_INET , SOCK_STREAM , 0);

 if(SockFD < 0 )
 {
  perror("Error Creating Socket");
  exit(1);
 }
 
 Server = (struct hostent *) gethostbyname(argv[1]);
 if(Server == NULL)
 {
  printf("Error :: No Such Host \n");
  exit(0);
 }

 bzero((char *) &Server_Address , sizeof(Server_Address));
 Server_Address.sin_family =  AF_INET;
 bcopy((char *) Server->h_addr , (char *) &Server_Address.sin_addr.s_addr , Server->h_length);
 
 Server_Address.sin_port = htons(PortNo);
 
 //connect to the server
 if(connect(SockFD , (struct sockaddr *) &Server_Address , sizeof(Server_Address)) < 0)
 {
  perror("Error Connecting \n");
  exit(1);
 }
 Data * data = new Data();
 n = read(SockFD , Buffer , sizeof(Data));
 data = (Data *) Buffer;
 printf("Read Size %d \n" , n);
 if(n < 0)
 {
  perror("Error receiving data \n");
  exit(1);
 }
 printf("Received :: ID :: %d  MESSAGE :: %s Complex ID:: %d \n", data->a , data->b ,data->id.c );
 bzero(Buffer , 1024);

 return 0;

}

First we compile the Server.cpp file
g++ -o Server Server.cpp

Then we run the server giving the port as an argument.
./Server  7788

Then compile and run the client program using a new terminal.
g++ -o Client Client.cpp

./Client localhost 7788

That's it folks. It's a simple approach. But solve the problem. Remember that this only works since we are using the same computer architecture. If you are using different architectures you have to consider about the endian problem also.. :)