Wątki nie pracują równolegle

0

Witam. Napisałem taką konsolówkę co ma obrabiac baze danych z txt tzn z kilku plików txt powiedzmy po parę MB skleić jeden, zrobić lowecase :D otyrać regexpem czy się zgadza z standardami, posortować i wywalić duplikaty...
wszystko pięknie śmiga poza wątkami, nie wiem czemu ale nie odpalają się równolegle tylko jeden po drugim tzn. jeden skończy prace włącza się drugi i nic nie robi i zamyka się tak jak bym go tam nie potrzebnie włożył. Siedzę nad tym 2h i nie mam pomysłu więc jak by ktoś mnie powiedział co ja źle robię

program dbem;
{$APPTYPE CONSOLE}
uses Messages, Windows, RegExpr, SysUtils, Classes;
const
 REGEXPRESSION = '[a-zA-Z]:[\d]'; // marny expr ale do testów zdaje egzamin (Imie Nazwisko:PESEL)
 Threads       = 2;

type TThreadChecker = class(TThread)
   TableID, TableCount, ID:Cardinal;
   Data:String;
   TID:Byte;
  protected
   procedure Execute;
  public
   constructor Create(ThreadID:Byte);
  private
   procedure NextTable;
end;

type TTRegExpr = class(TThread)
   ID:Cardinal;
   Data:String;
   R:TRegExpr;
   TID:Byte;
  protected
   procedure Execute;
  public
   constructor Create(ThreadID:Byte);
  private
   procedure GetData;
   procedure SetData;
end;

var
 DB, ALL : TStringList;
 TBL : array[0..255] of TStringList;
 D,I,X : Cardinal;
 Found : Integer;
 Path : String;
 F : TSearchRec;
 RE: array of TTRegExpr;
 TC: array of TThreadChecker;

//Nie śmiać się ale nie mogłem sobie przypomnieć jak się zapisywało liczbę z zerem wiodącym :D
function PrepareZeros(Value,Count:Integer):String;
var I:Integer;
begin
 Result:='';
 for I := 1 to (Count - Length(IntToStr(Value))) do Result:=Result+'0';
 Result:=Result+IntToStr(Value);
end;

constructor TTRegExpr.Create(ThreadID: Byte);
begin
 inherited Create(True);
 TID:=ThreadID;
 WriteLn('Create ',TID,' Thread');
end;
procedure TTRegExpr.Execute;
begin
 WriteLn('Run ',TID,' Thread');
 FreeOnTerminate := True;
 Synchronize(GetData);
 R:=TRegExpr.Create;
 while ID < D do begin
  Data:=ALL.Strings[ID];
  R.Expression:=REGEXPRESSION;
  R.ModifierG;
  if R.Exec(Data) then begin
   Data:=R.Match[0];
   Synchronize(SetData);
  end;
  Synchronize(GetData);
 end;
 WriteLn('End ',TID,' Thread');
end;
procedure TTRegExpr.GetData;
begin
 ID:=I;INC(I);
end;
procedure TTRegExpr.SetData;
begin
 DB.Add(Data); ALL.Strings[ID]:='';
end;

constructor TThreadChecker.Create(ThreadID: Byte);
begin
 inherited Create(True);
 TID:=ThreadID;
 WriteLn('Create ',TID,' Thread');
end;
procedure TThreadChecker.Execute;
var Exists:Boolean; Z:Cardinal;
begin
 WriteLn('Run ',TID,' Thread');
 FreeOnTerminate := True;

 NextTable;

 while I < 256 do begin
  while ID < TableCount do begin
   Exists:=False;
   for Z := ID+1 to TableCount - 1 do begin
    if TBL[TableID].Strings[ID] = TBL[TableID].Strings[Z] then begin
     Exists:=True;
     Break;
    end;
   end;
   if Exists = False then ALL.Add(TBL[TableID].Strings[ID]);
   INC(ID);
  end;
  Synchronize(NextTable);
 end;

 WriteLn('End ',TID,' Thread');
end;
procedure TThreadChecker.NextTable;
begin
 INC(I);
 if (TBL[I-1].Count = 0) AND (I<256) then NextTable else begin
  ID:=0;
  TableID:=I-1;
  TableCount:=TBL[I-1].Count;
  WriteLn('Prepare table ',I,' of ',255);
 end;
end;

begin
  try
  Path:=ParamStr(1);
  WriteLn(Path);
  ReadLN;
  Writeln('Start');
  DB:=TStringList.Create;
  ALL:=TStringList.Create;
  for I := 0 to 255 do TBL[I]:=TStringList.Create;

  Found := FindFirst(Path+'*', faAnyFile, F);
  while Found = 0 do begin
   if (F.Name<>'.') and (F.Name<>'..') then begin
    DB.LoadFromFile(Path+F.Name);
    WriteLn('Form ',F.Name,' load ',DB.Count,' records');
    for I := 0 to DB.Count - 1 do if POS('@',DB.Strings[I]) > 0 then ALL.Add(LowerCase(DB.Strings[I]));
    DB.Clear;
   end;
   Found := FindNext(F);
  end;

  ALL.SaveToFile(ExtractFilePath(ParamStr(0))+'~tmp0.txt');
  WriteLn('Save temp data with ',ALL.Count,' records');

  I:=0;
  D:=ALL.Count;
  SetLength(RE,Threads);
  for X := 0 to Threads -1 do RE[X]:=TTRegExpr.Create(X);
  for X := 0 to Threads -1 do RE[X].Execute;

  DB.SaveToFile(ExtractFilePath(ParamStr(0))+'~tmp1.txt');
  ALL.Clear;

  WriteLn('Making tables by first chars');
  for I := 0 to DB.Count -1 do TBL[Ord(DB.Strings[I][1])].Add(DB.Strings[I]);
  WriteLn('Sortings records in tables');
  DB.Clear;
  for I := 0 to 255 do if TBL[I].Count > 1 then TBL[I].Sort;
  WriteLn('Saving dumps of tables');
  MkDir(ExtractFilePath(ParamStr(0))+'TMP');
  Path:=ExtractFilePath(ParamStr(0))+'TMP\';
  for I := 0 to 255 do if TBL[I].Count > 0 then begin
   TBL[I].SaveToFile(Path+'tbl_'+PrepareZeros(I,3)+'.txt');
   WriteLn('Table ',Char(I),' has ',TBL[I].Count,' records');
  end;

  WriteLn('Cleaning varibles');
  for I := 0 to 255 do if TBL[I].Count > 1 then TBL[I].Clear;

  Path:=ExtractFilePath(ParamStr(0))+'TMP\';
  Found := FindFirst(Path+'*', faAnyFile, F);
  while Found = 0 do begin
   if (F.Name<>'.') and (F.Name<>'..') then begin
    WriteLn('>Load table ',F.Name);
    DB.LoadFromFile(Path+F.Name);
    for I := 0 to DB.Count - 1 do TBL[Ord(DB.Strings[I][2])].Add(DB.Strings[I]);

    WriteLn(' Checking duplicates');
    SetLength(TC,Threads);
    I:=0;
    D:=0;
    for X := 0 to Threads - 1 do TC[X]:=TThreadChecker.Create(X);
    for X := 0 to Threads - 1 do TC[X].Execute;

    WriteLn(' Cleaning varibles');
    for I := 0 to DB.Count - 1 do TBL[Ord(DB.Strings[I][2])].Clear;
    DB.Clear;
   end;
   Found := FindNext(F);
  end;
  ALL.SaveToFile(ExtractFilePath(ParamStr(0))+'db.txt');
  WriteLn('Save new db to ',ExtractFilePath(ParamStr(0))+'db.txt');
  for I := 0 to 255 do TBL[I].Free;
  DB.Free;
  ALL.Free;
  FindClose(F);
  WriteLn('Finish');
  ReadLn;
  except
    on E:Exception do
      Writeln(E.Classname, ': ', E.Message);
  end;
end.

Sry za temat ale nie wiem jak to inaczej nazwać...

0
TTRegExpr = class(TThread)
   ID:Cardinal;
   Data:String;
   R:TRegExpr;

Ta... TT(?)RegExpr to TThread... Lepiej nazwij np. TThreadRegExp albo podobnie.

  Synchronize(NextTable);
[...]
procedure TThreadChecker.NextTable;
begin
 INC(I);
 if (TBL[I-1].Count = 0) AND (I<256) then NextTable else begin
  ID:=0;
  TableID:=I-1;
  TableCount:=TBL[I-1].Count;
  WriteLn('Prepare table ',I,' of ',255);
 end;
end;

Tak tak, synchronizuj wszystko i dziw się że nie wykonuje się asynchronicznie.

wszystko pięknie śmiga poza wątkami, nie wiem czemu ale nie odpalają się równolegle tylko jeden po drugim tzn. jeden skończy prace włącza się drugi i nic nie robi i zamyka się tak jak bym go tam nie potrzebnie włożył. Siedzę nad tym 2h i nie mam pomysłu więc jak by ktoś mnie powiedział co ja źle robię

Ja tu widzę dwa typy wątków. Nic nie opisujesz co gdzie i jak.

for X := 0 to Threads -1 do RE[X]:=TTRegExpr.Create(X);
  for X := 0 to Threads -1 do RE[X].Execute;
 
  DB.SaveToFile(ExtractFilePath(ParamStr(0))+'~tmp1.txt');

Tak, najpierw stwórz wątki, potem załóż że wszystkie już zakończyły pracę. Używasz zmiennej między wątkowej tak jak normalnej zmiennej, masz gdzieś to że przez to wątki dłużej się synchronizują z główną pętlą niż wykonują obliczenia... Genialne.

for I := 0 to 255 do TBL[I]:=TStringList.Create;

Po co to tworzysz długo przed użyciem? Lubisz marnować pamięć?

  WriteLn('Making tables by first chars');
  for I := 0 to DB.Count -1 do TBL[Ord(DB.Strings[I][1])].Add(DB.Strings[I]);
  WriteLn('Sortings records in tables');
  DB.Clear;
  for I := 0 to 255 do if TBL[I].Count > 1 then TBL[I].Sort;

Wydajniej jest najpierw posortować całą tablicę a potem zapisywać do kolejnych plików.

A tak swoją drogą, to czemu twoje wszystkie wątki robią to samo?

0

for X := 0 to Threads -1 do RE[X]:=TTRegExpr.Create(X);
for X := 0 to Threads -1 do RE[X].Execute;
Najpierw tworzy wątki a później je odpala to logiczne chyba co?
Synchronize(NextTable); - synchronizuje tylko akcje wywołane na metodzie NextTable
for I := 0 to 255 do TBL[I]:=TStringList.Create;
A co za różnica czy przed samym użyciem czy na początku programu to jest mała aplikacja więc nie ma to znaczenia a lepiej wygląda jak jest w kupie razem z innymi metodami Create
Wątki się różnią i to masakrycznie, bo jeden sprawdza czy się wpisy poważają a drugi czy wpis jest poprawny (regexpr) i nie wiem w czym przeszkadza ci nazwa T - Typ T - Thread (wątek) i masz RegExpr i Checker wszystko jasne jak dla mnie.
Nie, nie jest lepiej, jak by to było tam jakieś 10k rekordów to bym to olał, ale jak masz **10 milionów ** rekordów to się zesra a nie posortuje. zauważ że sortując musi przelecieć całą tablice, jak nie wierzysz to sobie wygeneruj plik ~150MB tak po 10 znaków w linii załaduj do TStringList i posortuj zobaczysz efekt swojej bezmyślności.
A mój problem nie dotyczy całości tylko tego że wątki nie pracują równolegle.

0

for X := 0 to Threads -1 do RE[X]:=TTRegExpr.Create(X);
for X := 0 to Threads -1 do RE[X].Execute;
Najpierw tworzy wątki a później je odpala to logiczne chyba co?

A czy ja stwierdziłem że najpierw odpalasz a potem tworzysz? Po prostu po odpaleniu w następnych liniach zakładasz że wątki już zakończyły pracę.

Synchronize(NextTable); - synchronizuje tylko akcje wywołane na metodzie NextTable

Tak, tylko że te NextTable wykonuje z 200 kolejnych wywołań rekursywnych, zgadnij ile to potrwa. I jeszcze fun fact: Wcześniej w tym samym kodzie nie Synchronizujesz wywołania NextTable. Dlaczego jest wiadome tylko tobie.

A co za różnica czy przed samym użyciem czy na początku programu to jest mała aplikacja więc nie ma to znaczenia a lepiej wygląda jak jest w kupie razem z innymi metodami Create

Tak? To ty programujesz żeby kod logicznie powiązany oddalać od siebie? To po co ci procedury?!
Dobre nawyki są dobrymi nawykami i nawet jeżeli tutaj to nie jest duża strata to ja ci mówię jak to się robi dobrze. Równie dobrze możesz używać samych zmiennych globalnych i konstrukcji goto tylko problem w tym że tak jest nieczytelnie. Zamiast się tłumaczyć, słuchaj rad lepszego programisty, będzie ci prościej. Nie każdy od razu rozumiał po co formatować kod.

Wątki się różnią i to masakrycznie, bo jeden sprawdza czy się wpisy poważają a drugi czy wpis jest poprawny (regexpr) i nie wiem w czym przeszkadza ci nazwa T - Typ T - Thread (wątek) i masz RegExpr i Checker wszystko jasne jak dla mnie.

Czy gdzieś stwierdziłem że typy wątków nie różnią się? Po prostu mówisz o problemie z wątkami, ale nie wiem której klasy to dotyczy.
Co do TTRegExp: Ważniejsze od tego że to RegExp jest to że jest to wątek. Więc powinieneś użyć czegoś dłuższego niż samo dodatkowe 'T', zwłaszcza że takiej konwencji używasz dalej.

Nie, nie jest lepiej, jak by to było tam jakieś 10k rekordów to bym to olał, ale jak masz 10 milionów rekordów to się zesra a nie posortuje.zauważ że sortując musi przelecieć całą tablice, jak nie wierzysz to sobie wygeneruj plik ~150MB tak po 10 znaków w linii załaduj do TStringList i posortuj zobaczysz efekt swojej bezmyślności.

Sprawdziłem, i rzeczywiście, ku mojemu zaskoczeniu twoje rozwiązanie wychodzi lepsze (przeceniłem koszty podwójnych referencji). Btw. przy 10 milionach rekordów twój algorytm zesra się tak samo jak Sort, bo TStringList nie wytrzyma ponad 4mln elementów.
zauważ że sortując musi przelecieć całą tablice- zauważ że ty też przelatujesz całą tablicę kubełkując dane. Chodzi raczej o to że wydajniej jest pokubełkować dane (cytuję moją literaturę od algorytmów: "Sortowanie Kubełkowe jest sortowaniem najszybszym, jeśli sortowane elementy można jednostajnie podzielić (na podzbiory) za pomocą szybkiej funkcji haszującej").

A mój problem nie dotyczy całości tylko tego że wątki nie pracują równolegle.

Czyżby dlatego bo Syncują się cały czas? Jeżeli chodzi tobie o to że debug message wyświetlają zamykanie i dopiero wtedy otwieranie następnego, to zapewne chodzi o to że wątki wykonują się tak krótko, że zamykają się zanim następny zdąży dojść do wykonania. Pamiętaj że threadswitch zajmuje dużo czasu wobec czego planista może nie chcieć odpalać nowych wątków jeżeli to nie potrzebne.

3

NIGDZIE nie uruchamiasz wątków! Owszem tworzysz ich x instancji, potem na tych x instancjach wykonujesz metodę Execute - for X := 0 to Threads -1 do RE[X].Execute; ale TO NIE JEST URUCHOMIENIE wątku!! Poczytaj może podstawy na początek.

BTW jak byś to miał porządnie, tj klasa wątku w osobnym unicie to by nawet takie coś nie przeszło bo kompilator by Ci błąd zgłosił

0

"planista może nie chcieć odpalać nowych wątków jeżeli to nie potrzebne." - wtf? skąd planista ma wiedzieć, że przełączenie jest potrzebne albo nie? jest wątek, ma odpowiedni stan i priorytet to będzie uruchomiony i już.

Tak, tylko chodziło mi o to że przełączenie jest kosztowne i może planista przydzielił dłuższy czas starszemu wątkowi (który miał duże wymagania) i ten szybko skończył swoje działania, zanim jeszcze przystąpiono do młodszego wątku.

"TStringList nie wytrzyma ponad 4mln elementów" - czterech milionów, czy miliardów?

Ah, za późno piszę.

BTW jak byś to miał porządnie, tj klasa wątku w osobnym unicie to by nawet takie coś nie przeszło bo kompilator by Ci błąd zgłosił

Nie widzę czemu to by miało być porządnie. Widzę że ktoś wychodzi z założenia że więcej unitów=porządniej.

NIGDZIE nie uruchamiasz wątków! Owszem tworzysz ich x instancji, potem na tych x instancjach wykonujesz metodę Execute - for X := 0 to Threads -1 do RE[X].Execute; ale TO NIE JEST URUCHOMIENIE wątku!! Poczytaj może podstawy na początek.

Eh, jestem debilem :< . No ale tak to jest jak jest późno i się próbuje coś powiedzieć na forum.

0

Fakt, zapomniałem nie Execute tylko Resume dzięki za pomoc :D i zapomniałem o wstrzymaniu dalszych operacji do momentu zakończenia wątków. Wszystko mi działa.

1 użytkowników online, w tym zalogowanych: 0, gości: 1