[boost::thread + winsock2.h] czyli serwer wielowątkowy :)

0

Postanowiłem za jednym zamachem skumać wątki i sockety. Te drugie są zdecydowanie łatwiejsze, i wydaje mi się, że już sobie radzę. No ale co mi po samotnych socketach, jak trzeba zacząć się łączyć i pisać pierwszy komunikator (jak to zwykle w takich przypadkach). W związku z tym zaatakowałem boost::thread tylko że to jest trochę kosmos. No ale nie poddawałem się, i w stopniu podstawowym opanowałem <boost/thread/thread.hpp>
<boost/bind.hpp>
<boost/thread/mutex.hpp>
czyli wszystko co potrzebne aby stworzyć serwer wielowątkowy [diabel]
Oto część kodu, którą udało mi się stworzyć, niestety nie działa do końca jak się spodziewałem:

funkcja obsługująca przychodzących klientów

void hello(char nr)
    {
         char rcv_msg[100];
         char *numer;
         int rec;
         string adr="Witaj, polaczyles sie z ";
         adr.append(inet_ntoa(moj_adres.sin_addr));
         adr.append("\nJestes ");
         adr+=nr;
         adr.append(" podlaczonym uzytkownikiem.");
         boost::mutex::scoped_lock lock(o_mutex);
         if (send(polaczone[(nr-'0')-1],adr.c_str(),74,0)==-1){
         perror("send");
         cout<<WSAGetLastError();
         lock.unlock();}
         lock.unlock();
         do{
            boost::mutex::scoped_lock lock(i_mutex);
            rec=recv(polaczone[(nr-'0')-1],rcv_msg,99,0);
            lock.unlock();
            if(rec==-1){
            perror("recv");
            cout<<WSAGetLastError();
            ile--;
            break;
            }
            else if (rec==0){
            cout<<"\nKlient nr."<<nr<<" zerwal polaczenie.";
            ile--;
            vector<boost::thread (*)>::iterator i=watki.begin();
            advance(i,nr-'0');
            watki.erase(i);
            vector<SOCKET>::iterator j=polaczone.begin();
            advance(j,nr-'0');
            polaczone.erase(j);
            break;
            }
            boost::mutex::scoped_lock lock1(o_mutex);
            if (send(polaczone[(nr-'0')-1],rcv_msg,99,0)==-1){
            perror("send");
            cout<<WSAGetLastError();
            lock1.unlock();
            break;
            }
            lock1.unlock();
            } while(rcv_msg!="exit");  
    };

funkcja obsługująca przychodzące połączenia i startująca nowe wątki dla nich

void akceptuj()
{
    while(1)//główna pętla accept()
    {
        polaczone.push_back(accept(nasluchujacy, (struct sockaddr *)&ich_adres,
                                                           &rozmiar));
        if (polaczone[polaczone.size()-1] == -1) {
                perror("accept");
                cout<<WSAGetLastError();
                continue;
            }
        cout<<"\nGot connection from: "<<inet_ntoa(ich_adres.sin_addr);
        cout<<"\nWatek startuje z numerem: "<<polaczone.size();
        ile=polaczone.size();
        numer=ile+'0';
        watki.push_back(threads.create_thread(boost::bind(&hello,numer)));
    }
}//akceptuj

w funkcji main (pomijając wszystkie pierdoły związane z uruchamianiem socketów) odpalam wątek główny akceptujący przychodzące połączenia:

//odpalanie socketów itp itd...
cout<<"\nSerwer uruchomiony, oczekuje na polaczenie";    
boost::thread glowny(&akceptuj);
//dalej zamykam wątki i sockety

No i teraz problem. Serwer odpala się, czeka sobie na połącznia, gdy przychodzą wyświetla wiadomość że nastąpiło nowe połączenie, wypisuje od kogo (ip) oraz numer wątku, do którego zostało skierowane. Jest to na razie początek, więc serwer po prostu odsyła klientom ich wiadomości. Wszystko jest cacy dopóki jest połączenie jest jedno ;] Jak przychodzi drugie to mutexy dają o sobie znać (a raczej moja mierna wiedza na ich temat [wstyd] ). Niby nic się nie krzaczy, ale wiadomości są dostarczane do klientów sekwencyjnie. tzn gdy są podłączone 2 komputery, to na początku funkcja recv jest zarezerwowana dla tego z niższym numerem wątku, i tak na zmianę, jak 1 wyśle i otrzyma wiadomość, to potem drugi może to zrobić (a 1 musi czekać). Jak widać w kodzie, stworzyłem sobie 2 mutexy, wydawało mi się że starczy ale najwyraźniej się myliłem, albo może zupełnie źle je zastosowałem [???] Dodam jeszcze, że przy rozłączaniu jest ten sam problem, czyli rozłączenie następuje dopiero jak nadejdzie kolej komputera który chce się rozłączyć, wcześniej komenda jakby nie dociera do serwera (albo może dociera ale funkcja send() jest zajęta dla innego wątku). W każdym razie jak już się rozłączą wszystkie maszyny, to tylko druga jest rozłączana poprawnie, pierwsza (ta z niższym nr. wątku) dostaje błąd recv() :| Co ciekawe po tym rozłączeniu serwer też się wyłącza, chociaż pętla while(1) powinna przecież działać cały czas [!!!]
Mam nadzieję że komuś będzie się chciało to czytać i jakoś mi pomożecie, nie wrzucałem całego kodu bo dużo tego mam, ale jak będzie trzeba to wrzucę.

0

czekaj, czekaj, dla recv pisze w dokumentacji tak:

If no incoming data is available at the socket, the recv call waits for data to arrive unless the socket is nonblocking

a ty masz taki "paskudny" kawałek:

boost::mutex::scoped_lock lock(i_mutex);    // blokujesz wszystkim innym czytanie
rec=recv(polaczone[(nr-'0')-1],rcv_msg,99,0); // i trzymasz blok, dopóki coś nie dojdzie od usera x...
lock.unlock(); //bożee, dzięki bogu słuchany user x dał głos, pozwalasz czytać innym, ciekawe, kto teraz się dopcha, i znowu zrobi reszcie bloka? ;)

czy mi się tylko wydaje, czy i_mutex jest współdzielony przez wszystkich? Ależ po co? Przecież każdy ma swojego socketa, z cudzych nie czyta, chyba wszyscy mogą czytać ze swoich socketów niezależnie od innych, znaczy równocześnie... znaczy wywal tego mutexa

przy zapisie oczywiście mutex już jest potrzebny, ale też nie jeden dla wszystkich IMO. Tylko każdy user ma swojego mutexa. Podczas pisania go lockuje, a później zwalnia. Mutex ten będzie ofensywnie wykorzystywany, podczas rozsyłania wypowiedzi do wszystkich, bo jeśli user x mówi "Hi all", to będzie pętla:

for(i=0; i<ile;i++) {
   lock zapisu u usera "i"
   zapis do usera "i"
   unlock dla usera "i"
   }

nie wiem, czy taki design gwarantuje, a nawet sądzę na 99%, że NIE gwarantuje tego, że: wszyscy wszystko dostaną w identycznej kolejności, jeśli dwóch userów odezwie się w identycznym momencie. Ale to chyba nie jest problem, że np grześ będzie miał loga:

10:39:41.355 arek > Hi
10:39:41.355 juuuzio > Yo

a jakiś inny pieś:

10:39:41.355 juuuzio > Yo
10:39:41.355 arek > Hi
0

stworzyłem sobie 2 mutexy, wydawało mi się że starczy ale najwyraźniej się myliłem, albo może zupełnie źle je zastosowałem

Niewątpliwie ;) Zakładam, że z i_mutex i o_mutex korzystają wszystkie wątki. Problem polega na tym, że mutexy blokują wszystkie wątki, gdy któryś z nich wysyła lub odbiera (przy założeniu, że recv i send są blokujące).

	boost::mutex::scoped_lock lock(i_mutex);
	rec=recv(polaczone[(nr-'0')-1],rcv_msg,99,0);
	lock.unlock();

	[...]


	boost::mutex::scoped_lock lock1(o_mutex);
		
	if (send(polaczone[(nr-'0')-1],rcv_msg,99,0)==-1)
	{
		perror("send");
		cout<<WSAGetLastError();
		lock1.unlock(); //<--- po co????
		break;
	}
		
	lock1.unlock();  //<--- po co????
} while(rcv_msg!="exit"); 

Dodatkowo synchronizujesz w funkcji hello dostęp do poloczone, ale w akceptuj już nie. Z kolei dostęp do watki w ogóle nie jest zsynchronizowany.

polaczone[(nr-'0')-1]

To jest dość ryzykowne. Użyj mapy lub seta. I zamiast trzymać połączenie i wątek w dwóch oddzielnych kontenerach, zrób jeden.

0

O to to, 0x666 dobrze mówi, wódki mu! Ty globalnego mutexa zrobić zrób, ale takiego, który by czuwał nad wektorem, albo lepiej mapą, tak jak mówi 0x666 . Znaczy lockuj go podczas wstawiania/usuwania userów. Żebyś nie próbował wywalać albo wstawiać kogoś, gdy inny właśnie przegląda listę, bo klops będzie.

0

Dzięki wam za odpowiedzi :-) Dziś już nie będę tego robił bo jestem nie w formie [browar] ale jutro na pewno skorzystam ze wszystkich podpowiedzi i poprawię kod.

0

No dobra, powracam z poprawionym kodem:

void hello(char nr)
    {
         char rcv_msg[100];
         char *numer;
         int rec;
         string adr="Witaj, polaczyles sie z ";
         adr.append(inet_ntoa(moj_adres.sin_addr));
         adr.append("\nJestes ");
         adr+=nr;
         adr.append(" podlaczonym uzytkownikiem.");
         if (send(polaczone[(nr-'0')-1],adr.c_str(),74,0)==-1){
         perror("send");
         cout<<WSAGetLastError();
         }
         do{
            rec=recv(polaczone[(nr-'0')-1],rcv_msg,99,0);
            if(rec==-1){
            perror("recv");
            cout<<WSAGetLastError();
            ile--;
            break;
            }
            else if (rec==0){
            cout<<"\nKlient nr."<<nr<<" zerwal polaczenie.";
            ile--;
            boost::mutex::scoped_lock lock(watki_mutex);
            vector<boost::thread (*)>::iterator i=watki.begin();
            advance(i,(nr-'0')-1);
            watki.erase(i);
            lock.unlock();
            boost::mutex::scoped_lock lock1(polaczone_mutex);
            vector<SOCKET>::iterator j=polaczone.begin();
            advance(j,(nr-'0')-1);
            polaczone.erase(j);
            lock1.unlock();
            break;
            }
            if (send(polaczone[(nr-'0')-1],rcv_msg,99,0)==-1){
            perror("send");
            cout<<WSAGetLastError();
            break;
            }
            } while(rcv_msg!="exit");  
    };
void akceptuj()
{
    while(1)//główna pętla accept()
    {
        boost::mutex::scoped_lock lock(polaczone_mutex);
        polaczone.push_back(accept(nasluchujacy, (struct sockaddr *)&ich_adres,
                                                           &rozmiar));
        if (polaczone[polaczone.size()-1] == -1) {
                perror("accept");
                cout<<WSAGetLastError();
                continue;
            }
        lock.unlock();
        cout<<"\nGot connection from: "<<inet_ntoa(ich_adres.sin_addr);
        cout<<"\nWatek startuje z numerem: "<<polaczone.size();
        ile=polaczone.size();
        numer=ile+'0';
        boost::mutex::scoped_lock lock1(watki_mutex);
        watki.push_back(threads.create_thread(boost::bind(&hello,numer)));
        lock1.unlock();
    }
}//akceptuj

Wszystko działa [!!!] Niestety występuje problem, którego spodziewałem się od samego początku. Mianowicie gdy najpierw zamknę połączenie nr.1 to wątek i połączenie będące do tej pory na pozycjach 2 w swoich kontenerach dostają indeksy 1 - i wiadomo co - serwer się wysypuje bo odwołuję się do nieistniejącego elementu. Chyba rzeczywiście poszukam jakiegoś innego kontenerka z STL'a. Nie jestem ekspertem w tych sprawach, ale coś mi świta, że jakbym użył iteratorów zamiast zwykłych indeksów do odwoływania się do elementów wektora to one chyba nie tracą swojej ważności przy zmianie wielkości wektora [???] Tak czy inaczej idę się dokształcić z STL'a i sprawdzę to co proponowaliście wcześniej, czyli set i mapę. No chyba że ktoś ma jeszcze prostsze rozwiązanie? Dzięki za odpowiedzi.

P.S. Ranides poruszyłeś wcześniej kwestię przekazywania informacji między wątkami, zaproponowałeś pętlę. Ja jak pisałem ten serwer, to wymyśliłem sobie, żeby stworzyć globalny bufor, który będzie jednocześnie logiem całego serwera. Wtedy każdy wątek by do niego zapisywał i sobie go czytał za każdym razem, i wyświetlał u swojego klienta. Nie wiem czy to jest najlepszy pomysł, może lepiej zrobić tak jak ty radzisz, w każdym razie jest to kolejny temat do dyskusji, co myślicie?

P.S.1 Właśnie się dokształciłem z STL'a i niestety:

A vector's iterators are invalidated when its memory is reallocated.

ale za to:

Set has the important property that inserting a new element into a set does not invalidate iterators that point to existing elements. Erasing an element from a set also does not invalidate any iterators, except, of course, for iterators that actually point to the element that is being erased.

Czyli jest wyjście z tej sytuacji :-)

0
void hello(char nr)
    {
...
         adr.append("\nJestes ");
         adr+=nr;
         adr.append(" podlaczonym uzytkownikiem.");
...

to raczej nie zadziala, chyba ze poprzez nr przekazujesz pojedynczy znak od '0' do '9' max?

 boost::mutex::scoped_lock lock(o_mutex);
         if (send(polaczone[(nr-'0')-1],adr.c_str(),74,0)==-1){
             perror("send");
             cout<<WSAGetLastError();
             lock.unlock();
         }
         lock.unlock();

1' wnetrze IF'a po wykonaniu sie leci dalej. a wiec unlocka wykonasz dwa razy. cos tu nie gra:)
2' taka metoda wysylania jest bledna. SEND nie gwarantuje ze wysle wszystko. SEND zwraca liczbe wyslanych bajtow i ta liczba moze byc mniejsza od tego co kazales wyslac - w takim przypadku nalezy recznie wznowic wysylanie bufora od nowego miejsca..
3' tak jak poprzednicy mowia, synchronizacja podczas wysylania nie jest potrzebna wcale. przeciez danym userem zajmuje sie jedynie jeden watek. nalezy jedynie odczyt z tablicy socketow zsyncowac z dodawaniem/usuwaniem userow

boost::mutex::scoped_lock lock(i_mutex);
            rec=recv(polaczone[(nr-'0')-1],rcv_msg,99,0);
            lock.unlock();
            if(rec==-1){
            perror("recv");
            cout<<WSAGetLastError();
            ile--;
            break;
            }

dokladnie te same uwagi 2' i 3' co powyzej: zla obsluga RECV (dziala tak samo jak SEND!) oraz nie taka sync co powinna

else if (rec==0){
            cout<<"\nKlient nr."<<nr<<" zerwal polaczenie.";
            ile--;
            vector<boost::thread (*)>::iterator i=watki.begin();
            advance(i,nr-'0');
            watki.erase(i);
            vector<SOCKET>::iterator j=polaczone.begin();
            advance(j,nr-'0');
            polaczone.erase(j);
            break;
            }

ho ho ho, merry christmas..
1' brak synchronizacji wspomnianej wczesniej
2' wywalamy watek advance(i, NR-'0')? czy mi sie wydaje, czy watek pamieta swoj numer?
mamy watki: '0' '1' '2' '3'
watek 1 sie konczy i sie sam usuwa
mamy watki: '0' '2' '3'
a teraz sie konczy watek numer '2'.
ktory watek zostanie usuniety z listy? '2'-'0'=2, czyli '3'. oooops!
3' sockety ktore wyrzucasz wypadaloby zamknac
4' watki ktore wyrzucasz wypadalo by z'join'owac oraz z'delete'owac lub usuwac z threadgrupa

boost::mutex::scoped_lock lock1(o_mutex);
            if (send(polaczone[(nr-'0')-1],rcv_msg,99,0)==-1){
            perror("send");
            cout<<WSAGetLastError();
            lock1.unlock();
            break;
            }
            lock1.unlock();

te same uwagi co do wysylania i synchronizowania

char rcv_msg[100];
...
} while(rcv_msg!="exit");  

boze widzisz a nie grzmisz!!!

        polaczone.push_back(accept(nasluchujacy, (struct sockaddr *)&ich_adres,
                                                           &rozmiar));
...
        watki.push_back(threads.create_thread(boost::bind(&hello,numer)));

1' jak juz wspomniano, operacja push_back powinna byc synchronizowana z watkami, tak aby przypadkiem podczas jej wykonywania nie probowaly czytac z vectora

        if (polaczone[polaczone.size()-1] == -1) {
                perror("accept");
                cout<<WSAGetLastError();
                continue;
            }

przydaloby sie ten "zly" socket usunac z vectora a nie tylko ciagle nowe smieci dopisywac

        ile=polaczone.size();
        numer=ile+'0';
        watki.push_back(threads.create_thread(boost::bind(&hello,numer)));

ile+'0' zadziala tylko gdy size() <= 9. biorac pod uwage, ze polaczone ciagle sie wypelnia (patrz wyzej) - powodzenia ;)

//edit:
w miedzyczasie opublikowales nowy kod.. to co napisalem tyczy sie STAREGO, ale z tego co widze, 99% nowego rowniez sie tyczy :)

0

Patrz na mojego posta, tego zaraz przed Twoim, troszkę tam poprawiłem. Co do zamykania wątków i ogólnie tego co się dzieje na koniec to na razie jeszcze się nie chcę tym zajmować, bo chcę najpierw żeby to działało tak jak trzeba :-) Ale oczywiście na to też przyjdzie czas. Teraz najważniejsza sprawa to kontener na wątki i sockety, oraz ich obsługa w razie zamknięcia połączenia przez klienta. No i to co wspomniałeś - przekazywanie numeru wątku do jego wnętrza działa tylko do 9 ale właśnie już myślę, że jak wprowadzę nowy kontener to będę przekazywał do wątku jego iterator zamiast char'a i to powinno załatwić sprawę (przynajmniej tak mi się wydaje). A i dobrze, że poruszyłeś kwestię:

char rcv_msg[100];
do{
...
} while(rcv_msg!="exit");  

bo wstyd się przyznać, ale długo nad tym myślałem i nie mogłem wymyślić jak to porządnie zrobić. Powyższe rozwiązanie działa [wstyd]

Przy okazji jak już mam się zbierać za joinowanie wątków i usuwanie ich z threadgroup - da się joinować wątek z jego wnętrza?

0

od konca..

Przy okazji jak już mam się zbierać za joinowanie wątków i usuwanie ich z threadgroup - da się joinować wątek z jego wnętrza?

boost::thread ma metode join. masz wskaznik na watek - mozesz ja wywolac

Powyższe rozwiązanie działa

nie ma prawa dzialac. rownosc nigdy nie zajdzie, watek sie nigdy nie zauwazy napisu 'exit'. uzyj strcmp()

przekazywał do wątku jego iterator zamiast char'a i to powinno załatwić sprawę

w takim razie proponuje sie zastanowic, co sie dzieje z iteratorami w momencie gdy wywolasz na vector metode resize albo erase..

Patrz na mojego posta, tego zaraz przed Twoim, troszkę tam poprawiłem

tak, widzialem.. nawet 'editke' dopisalem do swojego postu. ale nie mam czasu od nowa analizowac :|


i naprawde zajmij sie uwaznie kwestia send/recv, bo mozesz sie kiedys nagle zaczac dziwic ze sie wszystko dziwnie zachowuje:)

0

Dodam jeszcze:

polaczone.push_back(accept(nasluchujacy, (struct sockaddr *)&ich_adres,&rozmiar));
if (polaczone[polaczone.size()-1] == -1) 
{
  [...]
}

W razie niepowodzenia polaczone trzyma niepotrzebnie wartość SOCKET. Można przecież tak:

SOCKED acc_socked=accept(nasluchujacy, (struct sockaddr *)&ich_adres,&rozmiar);
if (acc_socked == -1) 
{
  [...]
}
else
{
    boost::mutex::scoped_lock lock(polaczone_mutex); //<--- blokuje tylko na czas dodania
    polaczone.push_back(acc_socked);
}//<--- unlock

To:

polaczone[(nr-'0')-1]

w przypadku vectora musi być zsynchronizowane.

Zamiast watki_mutex i polaczone_mutex zrób jeden wspólny, bo nie może być sytuacji kiedy element z polaczone został usunięty, a z watki nie.

Na razie tyle :P

0

Prawdę mówicie koledzy, długa droga jeszcze przed moim serwerem. Co do wątku pamiętającego swój iterator to owszem, iterator kontenera vector szlag trafi, ale tak jak pisałem wcześniej, przeniosę to do seta albo mapy, i wtedy iteratory przetrwają. Kolejny ważny fakt to że dostęp do polaczone[] i watki[] powinien być zsynchronizowany, co tylko potwierdza fakt, że źle zrobiłem dzieląc to na dwa kontenery. Nowy kod zamieszczę prawdopodobnie jutro, bo dziś już trochę nie mam siły dzióbać. Dziękuję wam za odpowiedzi i czekam na dalsze baty jutro [browar]

0

No dobra, siedzę sobie i robię dalej. Wiem już jak powkładać połączenie i wątek do mapy. Jest pewne ale - tak jak wcześniej wspomniałem, do wątku chcę przesyłać iterator mapy, tak żeby mieć wszystkie dane pod ręką gdy będę ich potrzebował wewnątrz wątku. Niestety nie wiem jak pokonać pewien problem - w momencie startu wątku muszę mieć już gotowy ten iterator, abym mógł go przesłać. No a wiadomo że iteratora nie będę miał póki mapa jest pusta. Więc się zapętliłem [???] Chyba, że może najpierw powinienem wrzucić do mapy pusty rekord a potem sięgając z zewnątrz uzupełnić pola rekordu z zewnątrz? Poradźcie.

0

Zamiast iteratora przekaż klucz. Dopiero w wątku wyciągnij odpowiedni iterator.

0

jak bedzie w watku wyciagac iterator z mapy, to bedzie musial lockowac cala mape, bo w trakcie chodzenia po mapie i szukania wezla, moze sie okazac ze ktos mu node'a tuz sprzed nosa usunie i bedzie kaboom..

moze by tak:

  • dodaj pusty wpis z pustym thread* do mapy
  • tworz watek jako sleeping, podajac iterator do tego wpisu
  • majac juz thread* wpisz go na to puste miejsce
  • obudz watek
0

I tak właśnie zrobię :-) Jeszcze tylko się zastanawiam nad taką rzeczą. Bo jako klucz w mapie przyjąłem sobie inta, który będzie jednocześnie numerem połączenia. Natomiast jako część z danymi stworzyłem sobie strukturę zawierającą używany socket i wątek. Myślicie że dobrze to rozplanowałem, czy może lepiej da się to zrobić? Bo najlepiej to by było jakbym mógł sobie do mapy wrzucić dwa obiekty jako obiekt ale tak się nie da, szukałem nawet w innych bibliotekach takich kontenerów ale nie znalazłem (może słabo szukałem?). Co myślicie?

0

jak bedzie w watku wyciagac iterator z mapy, to bedzie musial lockowac cala mape

Owszem, raz.

void hello(key_type key)
{
    boost::mutex::scoped_lock lock(g_mutex);    
    map<key_type,value_type>::iterator it=g_conn_map.find(key);
    lock.unlock();    

    //... reszta stuffu
}
0

Tak czy inaczej muszę najpierw uruchomić wątek a potem dopiero mogę go dodać do mapy :-/ Tak sobie przeglądam dokumentację boosta i mam pytanie apropos uśpionych wątków. Czy chodzi o boost/thread/condition.hpp i metodę wait? Czy tak mam to osiągnąć? Sorry za takie pytania ale to moja pierwsza zabawa z wątkami.

0

Tak czy inaczej muszę najpierw uruchomić wątek a potem dopiero mogę go dodać do mapy

Nie wiem którą wersję wybrałeś. Jeśli moją, to nie ma problemu, bo wątek będzie zablokowany do momentu, gdy go odblokujesz w wątku servera.

boost::mutex::scoped_lock lock(g_mutex);    
//(A) tworzysz połączenie
//(B) tworzysz wątek      
//(C) dodajesz do mapy 
lock.unlock()             //<--- (D)

//.........

void hello(key_type key)
{
    boost::mutex::scoped_lock lock(g_mutex);   //<---- czeka od B do D
    map<key_type,value_type>::iterator it=g_conn_map.find(key);
    lock.unlock();   

    //... reszta stuffu
}
0

Z tego co mi wcześniej koledzy mówili to jakbym tak zrobił to bym wszystko zablokował :| Chodzi mi o punkt A w Twoim kodzie, podczas tworzenia połączenia używam funcji accept, która zablokuje mi mutexa do czasu nadejścia kolejnego połączenia. Popraw mnie jeśli się mylę, ale zmieniłbym Twój kod na taki:

//(A) tworzysz połączenie
boost::mutex::scoped_lock lock(g_mutex);   
//(B) tworzysz wątek     
//(C) dodajesz do mapy
lock.unlock()             //<--- (D)

//.........

void hello(key_type key)
{
    boost::mutex::scoped_lock lock(g_mutex);   //<---- czeka od B do D
    map<key_type,value_type>::iterator it=g_conn_map.find(key);
    lock.unlock();   

    //... reszta stuffu
}

Czyli w sumie zamiana dwóch linijek ale wydaje mi się że konieczna aby działało jak trzeba.

0

W punkcie A chodziło mi głównie o załatwienie wszelkich spraw związanych z nawiązanym połączeniem (inicjalizacja struktur itd.). W sumie, ten punkt ma najmniejsze znaczenie.

[...] ale zmieniłbym Twój kod na taki:

No i OK.

0

No dobra, dochodzę już powoli do końca (tak mi się wydaje). Połączenia już działają, teraz muszę zakodować sprzątanie po zakończonych połączeniach. Stworzyłem taki kawałek kodu:
Tu przechowuję połączenia i wątki:

struct polaczenie
{
       boost::thread *watek;
       SOCKET sock;
};

Poniżej kod porządkujący zasoby po zakończeniu połączenia. Jest on zawarty w funkcji wątku, także jakby co to .join() wywołuję od wewnątrz. Chyba tak można co nie?:

cout<<"\nKlient nr."<<nr<<" zerwal polaczenie.";
            boost::mutex::scoped_lock lock(io_mutex);//będziemy operować na mapie więc lock
            closesocket((*it).second.sock);//na początek zamykamy socket
            threads.remove_thread((*it).second.watek);//usuwamy wątek z threadgroup
            (*it).second.watek.join();//tu jest problem, nie chce się skompilować:`join' has not been declared 
            mapa.erase(it);//na koniec kasujemy wszystko z mapy
            lock.unlock();//uwalniamy mutexa
            break;//i kończymy funkcję wątku

Kurde, tak nak napisał wcześniej quetzalcoatl chciałem użyć funkcji join(), ale nie wiem czemu dostaję błąd przy kompilacji :|

Edit: cholera już wiem przecież strukturze mam wskaźnik a nie obiekt [diabel] i dlatego nie mogę wywołać join(). Zamieniam na

(*(*it).second.watek).join();

i idę testować [green]

Edit1: no i jest lipa bo nie mogę jednak join()'ować od środka :| A z kolei jakbym najpierw wywalił z mapy to tracę wskaźnik na wątek i po ptakach. Coś czuję że się motam ale mam nadzieję że coś doradzicie :-)

0

Nie znam boosta, więc podam alternatywne rozwiązanie. Wykorzystaj pętle z akceptuj. Zrób oddzielną listę dla zakończonych wątków, być może zrób też oddzielny mutex. Zamiast usuwać obiekt (boost::thread) w funkcji wątka, wrzuć go do tej listy. Pętla w akceptuj co iterację będzie sprawdzać czy w liście są jakieś wątki do usunięcia. Jeśli są, usuwa wszystko.

PS. tego co wyczytałem w dokumentacji, przed usunięciem wywołaj metodę join.

0

Podoba mi się ten pomysł, z tym że tak sobie pomyślałem, że może zamiast do listy będę wrzucał wątki do oddzielnej threadgroup'y dla której pętla accept będzie za każdym razem wywoływać metodę join_all(). Dzięki za pomysł [browar]

0

No dobra, kolejne schody. Nie mogę zrozumieć dlaczego to nie działa. Postanowiłem zrobić tak - Tworzę sobie nowy proces obsługujący joinowanie niepotrzebnych wątków:

void akceptuj()
{
    boost::thread joinuj(&watki_precz);//tutaj ten nowy proces, poza pętlą while żeby był niezależny 
                                                       //od nadchodzących połączeń
    while(1)//główna pętla accept()
    {
        temp.sock=accept(nasluchujacy, (struct sockaddr *)&ich_adres,&rozmiar);
        if (temp.sock==-1){
                           perror("accept");
                           cout<<WSAGetLastError();
                           continue;
                           }
        cout<<"\nGot connection from: "<<inet_ntoa(ich_adres.sin_addr);
        ile=0;
        boost::mutex::scoped_lock lock(io_mutex);
        while (mapa.find(++ile)!=mapa.end()){
              cout<<"\nnumer "<<ile<<" istnieje!";}
        cout<<"\nWatek startuje z numerem: "<<ile;
        temp.watek=threads.create_thread(boost::bind(&hello,ile));
        mapa.insert(pair<int,polaczenie>(ile,temp));
        lock.unlock();   
    }
}//akceptuj

jego funkcja wygląda następująco:

void watki_precz()
{
     while(1)
     {     
           boost::mutex::scoped_lock lock(martwe_mutex);
           martwe.join_all();
           lock.unlock();
     };
};

Gdy klient rozłącza się:

cout<<"\nKlient nr."<<nr<<" zerwal polaczenie.";
            boost::mutex::scoped_lock lock(io_mutex);
            closesocket((*it).second.sock);//zamykam socket
            threads.remove_thread((*it).second.watek);//wyrzucam wątek z grupy aktualnych wątków
                                                          //tu chyba też powinna być synchronizacja ^
            boost::mutex::scoped_lock lock1(martwe_mutex);
            martwe.add_thread((*it).second.watek);//dodaję wątek do grupy martwych
            lock1.unlock();
            mapa.erase(it);//usuwam z mapy
            lock.unlock();
            break;

Tak to wygląda, niestety gdy rozłączam któregoś klienta bardzo źle się dzieje, tzn. błędy recv i send. Jak próbuję się podłączyć po uprzednim rozłączeniu, to wyskakuje błąd accept [glowa] Zauważyłem jeszcze taką dziwną rzecz, że mniej błędów wyskakuje jak nie uruchamiam tego dodatkowego wątku, tzn. na chwilę go zrobię jako komentarz, to wtedy serwer dłużej działa zanim padnie na skutek odłączenia któregoś z klientów. Co to może być?

0

Ja się wtrącę. Skoro już korzystasz z boost to zainteresuj się tym http://asio.sourceforge.net/ :) Dziwię się, że to jeszcze nie jest część boosta :|

0

Heh, trochę pojechałeś :P Ale po kolei.

threads.create_thread(boost::bind(&hello,ile));

Jak rozumiem, threads to thread_group? Jeśli tak, to po co, przecież masz mapę?

while(1)
{     
    boost::mutex::scoped_lock lock(martwe_mutex);
    martwe.join_all();
    lock.unlock();
};

To zapewne zarzyna CPU. Trzeba dać trochę oddechu ;)

boost::xtime xt;

while(1)
{     
       boost::xtime_get(&xt, boost::TIME_UTC);
       xt.sec += 1;
       boost::thread::sleep(xt); //<--- zakładam, że to działa jak Sleep w windowsie, jeśli nie - użyj yield

       boost::mutex::scoped_lock lock(martwe_mutex);
       martwe.join_all();
};
threads.remove_thread((*it).second.watek);
[...]
martwe.add_thread((*it).second.watek);

Czy przypadkiem thread_group nie staje się właścicielem wątku i przy usuwaniu niszczy go???

0

Według mnie thread_group jest tylko kontenerem, i jak wyrzucam z niego wątek to ten wątek dalej istnieje, tylko nie należy już do grupy. Zaplanowałem sobie to tak, że w grupie threads będę miał aktualne wątki, natomiast w grupie martwe - te które idą do kasacji. Oprócz tego mapa jest mi potrzebna, bo tam umieszczam wskaźnik na wątek. Gdyby jej nie było to straciłbym ten wskaźnik i wątek szlajałby się aż bym nie zjoinował całej threadgrupy thread, a to mam zamiar zrobić tylko w przypadku planowanego wyłączenia serwera. Funkcja watki_precz() rzeczywiście musi nieźle żreć zasoby, ale to nadal nie tłumaczy faktu dlaczego serwer się sypie :-/ Podejrzewam, że błąd kryje się gdzieś w tych linijkach kodu odpowiedzialnych za odłączenie klienta :|

0

Według mnie thread_group jest tylko kontenerem, i jak wyrzucam z niego wątek to ten wątek dalej istnieje

Spojrzałem w źródła i tak jest (usuwa dopiero grupa jest niszczona).

0

Wybadałem trochę sytuację, zająłem się najprostszym przypadkiem gdy jeden klient się podłącza i rozłącza. Problem powoduje ten nowy wątek do obsługi niepotrzebnych wątków z grupy martwe :-( Jak go wyrzucam, to mogę się łączyć i rozłączać ile razy chcę, jednak gdy go uruchomię to mogę się połączyć tylko 2 razy, za trzecim razem serwer zatrzymuje się tutaj:

cout<<"\nGot connection from: "<<inet_ntoa(ich_adres.sin_addr);//to się wyświetla
        ile=0;
        boost::mutex::scoped_lock lock(io_mutex);//gdzieś tutaj
        while (mapa.find(++ile)!=mapa.end())//się zacina
        cout<<"\nWatek startuje z numerem: "<<ile;//to się już nie wyświetla przy 3 próbie podłączenia

Nie wiem co jest grane :|

0

Pokaż funkcję hello.

1 użytkowników online, w tym zalogowanych: 0, gości: 1