C++ multitreading działa wolniej

0

czesc,
mam problem z wydajnoscia. Testuje sobie petle

for (int i =0; i<100000;++i) {
sha256(i);
}

sha256 liczy hash z i.
Pomyslalem ze moze zrobie tak ze stworze sobie dwa watki i kazdy bedzie robil to samo tyle ze watek jeden z zakresu 0-50000 a drugi 50001-100000. Okazuje sie ze duzo wolniej idzie na dwoch watkach niz na jednym w pelnym zakresie. Jak to mozna zoptymalizowac?

0

używasz standardowy #include <thread> pod windows?

0

Okazuje sie ze duzo wolniej idzie na dwoch watkach niz na jednym w pelnym zakresie

Coś źle robisz.

0

Na jakim procesorze to uruchamiasz?

0

Pokaż kod.

0
_13th_Dragon napisał(a):

używasz standardowy #include <thread> pod windows?

tak, cygwin

5

To trochę wróżenie z fusów nie mając kodu i nie mogąc sprofilować. Trzeba wziąć pod uwagę implementację biblioteki; overhead związany z tworzeniem wątków na danym systemie operacyjnym; scheduler w systemie, który z uwagi na koszt cache coherency może uznać, że lepiej wykonać wszystkie wątki na tym samym corze; ewentualną synchronizację, która może doprowadzić do serialziacji wątków; et cetera.

1

Bo to jest zależne od procesora, od implementacji sha256 i od systemu operacyjnego. Szybciej będą działać funkcje na procesorach, gdzie każdy z wątków pójdzie na osobnym rdzeniu/procesorze, a funkcje nie będą współdzielić danych.

0

Standardowa biblioteka w okolicach <thread> jest istotnie skopana.
Chodzi o to że czas synchronizacji po zakończeniu wątku znacznie przewyższa zysk na podziale zadania na wątki.
Więc ja wciąż używam pthread.

http://4programmers.net/Forum/C_i_C++/236808-stdthread_oraz_cuda_na_kiju_z_nim_zwiazane?p=1051482#id1051482
http://4programmers.net/Forum/C_i_C++/236808-stdthread_oraz_cuda_na_kiju_z_nim_zwiazane?p=1051482#id1051482

0
_13th_Dragon napisał(a):

Standardowa biblioteka w okolicach <thread> jest istotnie skopana.
Chodzi o to że czas synchronizacji po zakończeniu wątku znacznie przewyższa zysk na podziale zadania na wątki.
Więc ja wciąż używam pthread.

http://4programmers.net/Forum/C_i_C++/236808-stdthread_oraz_cuda_na_kiju_z_nim_zwiazane?p=1051482#id1051482
http://4programmers.net/Forum/C_i_C++/236808-stdthread_oraz_cuda_na_kiju_z_nim_zwiazane?p=1051482#id1051482

Zaraz przetestuje natomiast dotychczas mój kod wyglądał tak:

#include <cstdlib>
#include <iostream>
#include <thread>
#include "sha256.h"

using namespace std;


void executeTask(int start, int stop) {
    clock_t tStart = clock();
    
    for (int i = start; i < stop; ++i) {
        string hash = sha256(sha256(std::to_string(i)));
    }
    
    printf("Time taken: %.2fs\n", (double)(clock() - tStart)/CLOCKS_PER_SEC);
}

/*
 * 
 */
int main(int argc, char** argv) {
    
    std::cout << "Number of threads = " 
            <<  std::thread::hardware_concurrency() << std::endl;
    
    thread t1(executeTask, 0, 1000000);
    thread t2(executeTask, 1000000, 2000000);
    
    t1.join();   
    t2.join(); 

    return 0;
}
Number of threads = 4
Time taken: 20.49s
Time taken: 20.65s

Natomiast gdybym odpalił tylko jeden wątek w pełnym zakresie:
thread t1(executeTask, 0, 2000000);
to wynik jest:

Number of threads = 4
Time taken: 19.26s

czyli w zasadzie szybciej, a zakładałem że w na dwóch wątkach powinno być znacznie szybciej niż te 19 sek.

Sama operacja która dzieje się w executeTask miała za zadanie dać jakieś minimalne obciążenie aby nie wykonało się to w sekunde :).

Co do sha256 to zastosowałem to http://www.zedwood.com/article/cpp-sha256-function

0
Czarny Pomidor napisał(a):

Zaraz przetestuje natomiast dotychczas mój kod wyglądał tak: ...

Właśnie o to chodzi że to nie działa jak należy, no chyba że masz na myśli rozwiązania przez pthread

0
_13th_Dragon napisał(a):
Czarny Pomidor napisał(a):

Zaraz przetestuje natomiast dotychczas mój kod wyglądał tak: ...

Właśnie o to chodzi że to nie działa jak należy, no chyba że masz na myśli rozwiązania przez pthread

Że zaraz przetestuje mam na myśli test pthread :).

0

sha256 by się przydało zobaczyć, może ma jakiegoś globalnego muteksa? Ewentualnie cały czas jest zjadany przez alokacje stringów, ale nie chce mi się wierzyć.

1

Chwila chwila. Ty po prostu źle mierzysz czas.

On the other hand, if the current process is multithreaded and more than one execution core is available, clock time may advance faster than wall clock.

http://en.cppreference.com/w/c/chrono/clock

void executeTask(int start, int stop) {
//	clock_t tStart = clock();
	auto startTime = chrono::steady_clock::now();

	for (int i = start; i < stop; ++i) {

		string hash = sha256(sha256(std::to_string(i)));
	}

	auto finishTime = chrono::steady_clock::now();

	printf("Time taken: %.2fs\n",
		   chrono::duration_cast<chrono::milliseconds>(finishTime-startTime).count()/1000.);
}

Spróbuj teraz.

0

Ok, do tego ze zle mierzylem czas to już włąsnie doszedłem, natomiast tak prezentuje się program na pthreadach.

#include <cstdlib>
#include <iostream>
#include <thread>
#include <pthread.h>
#include "sha256.h"
#include <math.h>

using namespace std;

#define NUM_THREADS 2
#define MIN 0
#define MAX 4000000

struct job_data{
   int start;
   int stop;
};


void *executeTask(void *threadarg) {
       struct job_data *my_data;
    
       my_data = (struct job_data *) threadarg;
    
       cout << "start: " << my_data->start << " stop: " << my_data->stop << endl;
    
    for (int i = my_data->start; i < my_data->stop; ++i) {
        string hash = sha256(sha256(std::to_string(i)));
    }
    
    pthread_exit(NULL);
}


int div_ceil(int numerator, int denominator)
{
        std::div_t res = std::div(numerator, denominator);
        return res.rem ? (res.quot + 1) : res.quot;
}

/*
 * 
 */
int main(int argc, char** argv) {
    
    std::cout << "Number of threads = " 
            <<  std::thread::hardware_concurrency() << std::endl;
    
   pthread_t threads[NUM_THREADS];
   struct job_data td[NUM_THREADS];
   int rc;
   int i;
   
   int step = div_ceil(MAX, NUM_THREADS);

   for( i=0; i < NUM_THREADS; i++ ){
      cout <<"main() : creating thread, " << i << endl;
      td[i].start = (i * step);
      td[i].stop = (i + 1) * step;
      
      rc = pthread_create(&threads[i], NULL,
                          executeTask, (void *)&td[i]);
      
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

(na podstawie jakiegoś przykładu z netu).
I teraz tak, gdy odpalam jeden watek program wykonuje sie 19 sek.
Na 2 watkach 11 s, natomiast na 3 i 4 również 11 sekund czyli nie ma już więcej progresu.

co do sha256 tak jak napisałem w pierwszym poście tutaj jest jego źródło:
http://www.zedwood.com/article/cpp-sha256-function

0

Ponieważ masz dwa procesory to przy nieobciążonym komputerze 2 wątki dają możliwe maksimum (pomijając drobnicę).
Natomiast przy obciążonym procesorze najlepszy wynik da to co zwraca: std::thread::hardware_concurrency()
Oczywiście o ile chodzi o pthread lub natywny wątek systemu, natomiast std::thread - popieprzono czarnym pieprzem.

2

Jak już wspomniał @kq, clock mierzy czas procesora, czyli sumaryczny czas spędzony przez wszystkie rdzenie na obliczeniach. Jeżeli program nie jest wykonywany, np czeka na standardowe wejście lub po prostu śpi, to czasu procesora nie przybywa. Przykład: http://ideone.com/010Zz9

Mierzenie czasu wykonania w Ubuntu:

/projekty/SortingAlgorithmsToolbox$ time ./activator clean test
(blabla tutaj wynik wykonania polecenia)

real	0m31.966s
user	1m44.728s
sys	0m3.131s

Real - czas rzeczywisty
User - czas procesora spędzony w userspace
Sys - czas procesora spędzony w kernelu

Mam czterowątkowy procesor i jak widać czas procesora jest ponad 3x wyższy niż czas rzeczywisty, a więc wykonanie polecenia skaluje się dobrze na wiele rdzeni.

1

Polecam przetestowanie:


#ifdef _WIN32
#include <Windows.h> // Sleep
#endif

#include <cstring>
#include <cstdlib>

#include <string>
#include <mutex>
#include <thread>
#include <iostream>
#include <chrono>

std::string sha256(std::string input);

// -- main.cpp --

const int num_threads = 10;
const int range = 10000;
const int range_for_thread = range / num_threads;

static int finished = 0;
static std::mutex finished_mutex;

void async_thread()
{
	static std::mutex output_mutex;
	static int range_start = 0;

	int my_range = range_start;
	range_start += range_for_thread;

	for (int i = my_range; i < (my_range + range_for_thread); ++i)
	{
		std::string sum = sha256(std::to_string(i));
		output_mutex.lock();
		printf("sum for %i is %s\n", i, sum.c_str());
		output_mutex.unlock();
	}

	finished_mutex.lock();
	finished++;
	finished_mutex.unlock();
}

template<typename TimeT = std::chrono::milliseconds>
struct measure
{
	template<typename F, typename ...Args>
	static typename TimeT::rep execution(F&& func, Args&&... args)
	{
		auto start = std::chrono::system_clock::now();
		std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
		auto duration = std::chrono::duration_cast< TimeT>
			(std::chrono::system_clock::now() - start);
		return duration.count();
	}
};

void run_async()
{
	std::thread threads[num_threads];
	for (int i = 0; i < num_threads; ++i)
	{
		threads[i] = std::thread(async_thread);
	}
	for (int i = 0; i < num_threads; ++i)
	{
		threads[i].detach();
	}

	bool working = true;
	while (working)
	{
		finished_mutex.lock();
		if (finished == num_threads)
		{
			working = false;
		}
		finished_mutex.unlock();
#ifdef _WIN32
		Sleep(30);
#endif
	}
}

void run_sync()
{
	for (int i = 0; i < range; ++i)
	{
		std::string sum = sha256(std::to_string(i));
		printf("sum for %i is %s\n", i, sum.c_str());
	}
}

int main()
{
	int stop = 0;
	std::cin >> stop;
	std::cout << measure<>::execution(run_async) << "ms" << std::endl;
	std::cout << "type any number to continue.. " << std::endl;
	std::cin >> stop;
	std::cout << measure<>::execution(run_sync) << "ms" << std::endl;
	std::cin >> stop;

	return 0;
}

// -- sha256.h --

class SHA256
{
protected:
	typedef unsigned char uint8;
	typedef unsigned int uint32;
	typedef unsigned long long uint64;

	const static uint32 sha256_k[];
	static const unsigned int SHA224_256_BLOCK_SIZE = (512 / 8);
public:
	void init();
	void update(const unsigned char *message, unsigned int len);
	void final(unsigned char *digest);
	static const unsigned int DIGEST_SIZE = (256 / 8);

protected:
	void transform(const unsigned char *message, unsigned int block_nb);
	unsigned int m_tot_len;
	unsigned int m_len;
	unsigned char m_block[2 * SHA224_256_BLOCK_SIZE];
	uint32 m_h[8];
};

#define SHA2_SHFR(x, n)    (x >> n)
#define SHA2_ROTR(x, n)   ((x >> n) | (x << ((sizeof(x) << 3) - n)))
#define SHA2_ROTL(x, n)   ((x << n) | (x >> ((sizeof(x) << 3) - n)))
#define SHA2_CH(x, y, z)  ((x & y) ^ (~x & z))
#define SHA2_MAJ(x, y, z) ((x & y) ^ (x & z) ^ (y & z))
#define SHA256_F1(x) (SHA2_ROTR(x,  2) ^ SHA2_ROTR(x, 13) ^ SHA2_ROTR(x, 22))
#define SHA256_F2(x) (SHA2_ROTR(x,  6) ^ SHA2_ROTR(x, 11) ^ SHA2_ROTR(x, 25))
#define SHA256_F3(x) (SHA2_ROTR(x,  7) ^ SHA2_ROTR(x, 18) ^ SHA2_SHFR(x,  3))
#define SHA256_F4(x) (SHA2_ROTR(x, 17) ^ SHA2_ROTR(x, 19) ^ SHA2_SHFR(x, 10))
#define SHA2_UNPACK32(x, str)                 \
{                                             \
    *((str) + 3) = (uint8) ((x)      );       \
    *((str) + 2) = (uint8) ((x) >>  8);       \
    *((str) + 1) = (uint8) ((x) >> 16);       \
    *((str) + 0) = (uint8) ((x) >> 24);       \
}
#define SHA2_PACK32(str, x)                   \
{                                             \
    *(x) =   ((uint32) *((str) + 3)      )    \
           | ((uint32) *((str) + 2) <<  8)    \
           | ((uint32) *((str) + 1) << 16)    \
           | ((uint32) *((str) + 0) << 24);   \
}

// -- sha256.cpp --
const unsigned int SHA256::sha256_k[64] = //UL = uint32
{ 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5,
0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3,
0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc,
0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7,
0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13,
0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3,
0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5,
0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208,
0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2 };

void SHA256::transform(const unsigned char *message, unsigned int block_nb)
{
	uint32 w[64];
	uint32 wv[8];
	uint32 t1, t2;
	const unsigned char *sub_block;
	int i;
	int j;
	for (i = 0; i < (int)block_nb; i++) {
		sub_block = message + (i << 6);
		for (j = 0; j < 16; j++) {
			SHA2_PACK32(&sub_block[j << 2], &w[j]);
		}
		for (j = 16; j < 64; j++) {
			w[j] = SHA256_F4(w[j - 2]) + w[j - 7] + SHA256_F3(w[j - 15]) + w[j - 16];
		}
		for (j = 0; j < 8; j++) {
			wv[j] = m_h[j];
		}
		for (j = 0; j < 64; j++) {
			t1 = wv[7] + SHA256_F2(wv[4]) + SHA2_CH(wv[4], wv[5], wv[6])
				+ sha256_k[j] + w[j];
			t2 = SHA256_F1(wv[0]) + SHA2_MAJ(wv[0], wv[1], wv[2]);
			wv[7] = wv[6];
			wv[6] = wv[5];
			wv[5] = wv[4];
			wv[4] = wv[3] + t1;
			wv[3] = wv[2];
			wv[2] = wv[1];
			wv[1] = wv[0];
			wv[0] = t1 + t2;
		}
		for (j = 0; j < 8; j++) {
			m_h[j] += wv[j];
		}
	}
}

void SHA256::init()
{
	m_h[0] = 0x6a09e667;
	m_h[1] = 0xbb67ae85;
	m_h[2] = 0x3c6ef372;
	m_h[3] = 0xa54ff53a;
	m_h[4] = 0x510e527f;
	m_h[5] = 0x9b05688c;
	m_h[6] = 0x1f83d9ab;
	m_h[7] = 0x5be0cd19;
	m_len = 0;
	m_tot_len = 0;
}

void SHA256::update(const unsigned char *message, unsigned int len)
{
	unsigned int block_nb;
	unsigned int new_len, rem_len, tmp_len;
	const unsigned char *shifted_message;
	tmp_len = SHA224_256_BLOCK_SIZE - m_len;
	rem_len = len < tmp_len ? len : tmp_len;
	memcpy(&m_block[m_len], message, rem_len);
	if (m_len + len < SHA224_256_BLOCK_SIZE) {
		m_len += len;
		return;
	}
	new_len = len - rem_len;
	block_nb = new_len / SHA224_256_BLOCK_SIZE;
	shifted_message = message + rem_len;
	transform(m_block, 1);
	transform(shifted_message, block_nb);
	rem_len = new_len % SHA224_256_BLOCK_SIZE;
	memcpy(m_block, &shifted_message[block_nb << 6], rem_len);
	m_len = rem_len;
	m_tot_len += (block_nb + 1) << 6;
}

void SHA256::final(unsigned char *digest)
{
	unsigned int block_nb;
	unsigned int pm_len;
	unsigned int len_b;
	int i;
	block_nb = (1 + ((SHA224_256_BLOCK_SIZE - 9)
		< (m_len % SHA224_256_BLOCK_SIZE)));
	len_b = (m_tot_len + m_len) << 3;
	pm_len = block_nb << 6;
	memset(m_block + m_len, 0, pm_len - m_len);
	m_block[m_len] = 0x80;
	SHA2_UNPACK32(len_b, m_block + pm_len - 4);
	transform(m_block, block_nb);
	for (i = 0; i < 8; i++) {
		SHA2_UNPACK32(m_h[i], &digest[i << 2]);
	}
}

std::string sha256(std::string input)
{
	unsigned char digest[SHA256::DIGEST_SIZE];
	memset(digest, 0, SHA256::DIGEST_SIZE);

	SHA256 ctx = SHA256();
	ctx.init();
	ctx.update((unsigned char*)input.c_str(), input.length());
	ctx.final(digest);

	char buf[2 * SHA256::DIGEST_SIZE + 1];
	buf[2 * SHA256::DIGEST_SIZE] = 0;
	for (int i = 0; i < SHA256::DIGEST_SIZE; i++)
		sprintf(buf + i * 2, "%02x", digest[i]);
	return std::string(buf);
}

// EOF
 

Asynchronicznie jest dłużej niż synchronicznie. Nawet bez mutexa na output (output_mutex) wychodzi dłużej.

http://ideone.com/PQtHP8

0

Dla małych zadań narzut na tworzenie wątków może przewyższyć zysk z wielowątkowości. Ale dla odpowiednio dużych zadań już jest OK. Przykład:

import java.nio.charset.Charset
import java.security.MessageDigest

object Main {
  val charset = Charset.forName("UTF-16")

  def main(args: Array[String]): Unit = {
    println("Warmup")
    parallel(12345678)
    val total = 12345678
    println("Sequential")
    timed(sequential(total))
    println("Parallel")
    timed(parallel(total))
  }

  def timed(block: => Unit): Unit = {
    val startTime = System.currentTimeMillis()
    block
    val endTime = System.currentTimeMillis()
    println(s"Total time: ${endTime - startTime} ms")
  }

  def sequential(totalNum: Int): Unit = {
    (1 to totalNum).foreach(computeHash)
  }

  def parallel(totalNum: Int): Unit = {
    (1 to totalNum).par.foreach(computeHash)
  }

  def computeHash(value: Int): Array[Byte] = {
    val mDigest = MessageDigest.getInstance("SHA-256")
    mDigest.digest(value.toString.getBytes(charset))
  }
}

Wynik u mnie (Core i5-4670):

Warmup
Sequential
Total time: 7858 ms
Parallel
Total time: 2450 ms
0
_13th_Dragon napisał(a):

Ponieważ masz dwa procesory to przy nieobciążonym komputerze 2 wątki dają możliwe maksimum (pomijając drobnicę).
Natomiast przy obciążonym procesorze najlepszy wynik da to co zwraca: std::thread::hardware_concurrency()
Oczywiście o ile chodzi o pthread lub natywny wątek systemu, natomiast std::thread - popieprzono czarnym pieprzem.

Właśnie jak zobaczysz jeszcze w mój kod to ja tam na początku wyświetlam std:🧵:hardware_concurrency() i to mi zwraca wartość 4 dlatego mnie to tak dziwi, że nie zależnie czy 2 czy 4 to czas wykonywania tego zadania jest taki sam w zasadzie (pomimo tego z drugiej strony, że każda pętla for ma mnieszy zakres na którym liczy)

1 użytkowników online, w tym zalogowanych: 0, gości: 1