Mam problem z synchronizjacją wątków z wykorzystaniem monitorów. wygląda na to że jakiś sygnał jest gubiony. Chodzi tu o budzenie konsumentów. Wydaje mi się że jest tak, że sygnał obudzenie jest wysyłany i Od razu proces który to zrobił jest wywłaszczany i nie "idzie spać". Przez co drugi konsument nie jest w stanie go obudzić jeśli będzie taka konieczność ;/ nie wiem ani dlaczego tak jest, ani jak to poprawić ;/
#include <queue>
#include <iostream>
#include <stdio.h>
#include <time.h>
#include "monitor.h"
class Buffer : protected Monitor
{
public:
/**
* Zwraca liczbe nieparzysta jesli takowa
* znajduje sie na czole kolejki, w przeciwnym razie
* idzie spac i czeka az obudzi go drugi konsument
*/
int getOdd()
{
enter();
int num;
if( buff.size() == 0 ){
std::cout << "[Odd] Choinka! Kolejka jest pusta !" << std::endl;
wait(buff_empty);
}
if ( buff.front() % 2 == 0){
std::cout << "[Odd] Oj liczba parzysta, musze czekac..." << std::endl;
signal(buff_evens);
wait(buff_odds);
}
num = buff.front();
buff.pop();
odd_cnt--;
std::cout << "[Odd] Pobralem liczbe nieparzysta, w kolejce pozostalo " << buff.size() << " liczb" << std::endl;
signal(produce_odds);
signal(buff_full);
leave();
return num;
}
/**
* Zwraca liczbe parzysta jesli takowa znajduje
* na czole kolejki
*/
int getEven()
{
enter();
int num;
if( buff.size() == 0){
std::cout << "[Even] W misia, dlaczego kolejka jest pusta ?!?!?" << std::endl;
wait(buff_empty);
}
if( buff.front() % 2 != 0){
std::cout << "[Even] Oj, nieparzysta, zdrzemne sie... " << buff.front() << std::endl;
signal(buff_odds);
wait(buff_evens);
}
num = buff.front();
buff.pop();
std::cout << "[Even] Zjadlem parzysta, w kolejce zostalo " << buff.size() << " liczb" << std::endl;
signal(buff_full);
leave();
return num;
}
/**
* Dodaje liczbe do bufora
*/
void put(int i)
{
enter();
if( i % 2 != 0 )
{
if( buff.size() == 10 ){
std::cout << "[Prod]Chcialem dodac nieparzysta, ale kolejka pena" << std::endl;
wait(buff_full);
}
if(odd_cnt == 3){
std::cout << "[Prod]Chcialem dodac nieparzysta, ale za duzo ich poki co" << std::endl;
wait(produce_odds);
}
buff.push(i);
std::cout << "[Prod]Dodalem nieparzysta liczbe: " << i << std::endl;
odd_cnt++;
signal(buff_empty);
}
else
{
if( buff.size() == 10 ){
std::cout << "[Prod]Chciaem dodac parzysta, ale bufor peny" << std::endl;
wait(buff_full);
}
buff.push(i);
std::cout << "[Prod]Dodalem parzysta liczbe: " << i << std::endl;
signal(buff_empty);
}
leave();
}
private:
std::queue<int> buff;
int odd_cnt;
Condition buff_full, buff_empty, buff_odds, buff_evens;
Condition produce_odds;
};
Buffer bufor;
// produkuje nparzyste
void* produA(void*)
{
int i = 1;
while(true) {
bufor.put(i);
i += 2;
sleep(rand()%4);
}
}
void* produB(void*)
{
int i = 0;
while(true){
bufor.put(i);
i += 2;
sleep(rand()%8);
}
}
void* consuA(void*)
{
while(true) {
bufor.getOdd();
sleep(rand()%2);
}
}
void* consuB(void*)
{
while(true) {
bufor.getEven();
sleep(rand()%5);
}
}
int main()
{
pthread_t prodA, prodB, consC, consD;
srand(time(0));
pthread_create(&consC, NULL, consuA, NULL);
pthread_create(&consD, NULL, consuB, NULL);
pthread_create(&prodA, NULL, produA, NULL);
pthread_create(&prodB, NULL, produB, NULL);
pthread_join(prodA, NULL);
pthread_join(prodB, NULL);
pthread_join(consC, NULL);
pthread_join(consD, NULL);
return 0;
}
tak wygląda implementacja semafora i monitora:
#ifndef __monitor_h
#define __monitor_h
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
class Semaphore
{
public:
Semaphore( int value )
{
#ifdef _WIN32
sem = CreateSemaphore( NULL, value, 1, NULL );
#else
if( sem_init( & sem, 0, value ) != 0 )
throw "sem_init: failed";
#endif
}
~Semaphore()
{
#ifdef _WIN32
CloseHandle( sem );
#else
sem_destroy( & sem );
#endif
}
void p()
{
#ifdef _WIN32
WaitForSingleObject( sem, INFINITE );
#else
if( sem_wait( & sem ) != 0 )
throw "sem_wait: failed";
#endif
}
void v()
{
#ifdef _WIN32
ReleaseSemaphore( sem, 1, NULL );
#else
if( sem_post( & sem ) != 0 )
throw "sem_post: failed";
#endif
}
private:
#ifdef _WIN32
HANDLE sem;
#else
sem_t sem;
#endif
};
class Condition
{
friend class Monitor;
public:
Condition() : w( 0 )
{
waitingCount = 0;
}
void wait()
{
w.p();
}
bool signal()
{
if( waitingCount )
{
-- waitingCount;
w.v();
return true;
}//if
else
return false;
}
private:
Semaphore w;
int waitingCount; //liczba oczekujacych watkow
};
class Monitor
{
public:
Monitor() : s( 1 ) {}
void enter()
{
s.p();
}
void leave()
{
s.v();
}
void wait( Condition & cond )
{
++ cond.waitingCount;
leave();
cond.wait();
}
void signal( Condition & cond )
{
if( cond.signal() )
enter();
}
private:
Semaphore s;
};
#endif
Treść zadania jaki powyższy kod ma rozwiązywać to:
Mamy bufor FIFO na liczby całkowite. Proces A generuje kolejne liczby
parzyste, jeżeli w buforze jest mniej niż 10 liczb. Proces B generuje
kolejne liczby nieparzyste, jeżeli w buforze jest mniej niż 3 liczby
nieparzyste. Proces C zjada liczby parzyste, proces D zjada liczby
nieparzyste. Zrealizuj ww. funkcjonalność przy pomocy semaforów.