Merge pull request #4103 from mseminatore/windows_perf
Improve Windows threading performance scaling
This commit is contained in:
		
						commit
						7acff1be1c
					
				| 
						 | 
					@ -216,3 +216,8 @@ In chronological order:
 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
* Pablo Romero <https://github.com/pablorcum>
 | 
					* Pablo Romero <https://github.com/pablorcum>
 | 
				
			||||||
  * [2022-08] Fix building from sources for QNX
 | 
					  * [2022-08] Fix building from sources for QNX
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					* Mark Seminatore <https://github.com/mseminatore>
 | 
				
			||||||
 | 
					  * [2023-06-23] Fix bounds issue in goto_set_num_threads
 | 
				
			||||||
 | 
					  * [2023-06-23] Improve Windows threading performance scaling
 | 
				
			||||||
 | 
					  
 | 
				
			||||||
| 
						 | 
					@ -50,11 +50,19 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* This is a thread implementation for Win32 lazy implementation */
 | 
					/* This is a thread implementation for Win32 lazy implementation */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#if defined (__GNUC__) && (__GNUC__ < 6)
 | 
				
			||||||
 | 
						#define WIN_CAS(dest, exch, comp) __sync_val_compare_and_swap(dest, comp, exch)
 | 
				
			||||||
 | 
					#else
 | 
				
			||||||
 | 
						#if defined(_WIN64)
 | 
				
			||||||
 | 
							#define WIN_CAS(dest, exch, comp) InterlockedCompareExchange64(dest, exch, comp)
 | 
				
			||||||
 | 
						#else
 | 
				
			||||||
 | 
							#define WIN_CAS(dest, exch, comp) InterlockedCompareExchange(dest, exch, comp)
 | 
				
			||||||
 | 
						#endif
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* Thread server common information */
 | 
					/* Thread server common information */
 | 
				
			||||||
typedef struct{
 | 
					typedef struct{
 | 
				
			||||||
  CRITICAL_SECTION lock;
 | 
					  HANDLE taskSemaphore;
 | 
				
			||||||
  HANDLE filled;
 | 
					 | 
				
			||||||
  HANDLE killed;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  blas_queue_t	*queue;    /* Parameter Pointer */
 | 
					  blas_queue_t	*queue;    /* Parameter Pointer */
 | 
				
			||||||
  int		shutdown;  /* server shutdown flag */
 | 
					  int		shutdown;  /* server shutdown flag */
 | 
				
			||||||
| 
						 | 
					@ -71,8 +79,6 @@ static blas_pool_t   pool;
 | 
				
			||||||
static HANDLE	    blas_threads   [MAX_CPU_NUMBER];
 | 
					static HANDLE	    blas_threads   [MAX_CPU_NUMBER];
 | 
				
			||||||
static DWORD	    blas_threads_id[MAX_CPU_NUMBER];
 | 
					static DWORD	    blas_threads_id[MAX_CPU_NUMBER];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
 | 
					static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      if (!(mode & BLAS_COMPLEX)){
 | 
					      if (!(mode & BLAS_COMPLEX)){
 | 
				
			||||||
| 
						 | 
					@ -198,7 +204,6 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* This is a main routine of threads. Each thread waits until job is */
 | 
					/* This is a main routine of threads. Each thread waits until job is */
 | 
				
			||||||
/* queued.                                                           */
 | 
					/* queued.                                                           */
 | 
				
			||||||
 | 
					 | 
				
			||||||
static DWORD WINAPI blas_thread_server(void *arg){
 | 
					static DWORD WINAPI blas_thread_server(void *arg){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /* Thread identifier */
 | 
					  /* Thread identifier */
 | 
				
			||||||
| 
						 | 
					@ -207,9 +212,7 @@ static DWORD WINAPI blas_thread_server(void *arg){
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  void *buffer, *sa, *sb;
 | 
					  void *buffer, *sa, *sb;
 | 
				
			||||||
  blas_queue_t	*queue;
 | 
					  volatile blas_queue_t	*queue;
 | 
				
			||||||
  DWORD action;
 | 
					 | 
				
			||||||
  HANDLE handles[] = {pool.filled, pool.killed};
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /* Each server needs each buffer */
 | 
					  /* Each server needs each buffer */
 | 
				
			||||||
  buffer   = blas_memory_alloc(2);
 | 
					  buffer   = blas_memory_alloc(2);
 | 
				
			||||||
| 
						 | 
					@ -226,28 +229,32 @@ static DWORD WINAPI blas_thread_server(void *arg){
 | 
				
			||||||
    fprintf(STDERR, "Server[%2ld] Waiting for Queue.\n", cpu);
 | 
					    fprintf(STDERR, "Server[%2ld] Waiting for Queue.\n", cpu);
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    do {
 | 
						// all worker threads wait on the semaphore
 | 
				
			||||||
      action = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
 | 
						WaitForSingleObject(pool.taskSemaphore, INFINITE);
 | 
				
			||||||
    } while ((action != WAIT_OBJECT_0) && (action != WAIT_OBJECT_0 + 1));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (action == WAIT_OBJECT_0 + 1) break;
 | 
						// kill the thread if we are shutting down the server
 | 
				
			||||||
 | 
						if (pool.shutdown)
 | 
				
			||||||
 | 
							break;
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
#ifdef SMP_DEBUG
 | 
					#ifdef SMP_DEBUG
 | 
				
			||||||
    fprintf(STDERR, "Server[%2ld] Got it.\n", cpu);
 | 
					    fprintf(STDERR, "Server[%2ld] Got it.\n", cpu);
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    EnterCriticalSection(&pool.lock);
 | 
						// grab a queued task and update the list
 | 
				
			||||||
 | 
						volatile blas_queue_t* queue_next;
 | 
				
			||||||
 | 
						INT_PTR prev_value;
 | 
				
			||||||
 | 
						do {
 | 
				
			||||||
 | 
							queue = (volatile blas_queue_t*)pool.queue;
 | 
				
			||||||
 | 
							if (!queue)
 | 
				
			||||||
 | 
								break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    queue = pool.queue;
 | 
							queue_next = (volatile blas_queue_t*)queue->next;
 | 
				
			||||||
    if (queue) pool.queue = queue->next;
 | 
							prev_value = WIN_CAS((INT_PTR*)&pool.queue, (INT_PTR)queue_next, (INT_PTR)queue);
 | 
				
			||||||
 | 
						} while (prev_value != queue);
 | 
				
			||||||
    LeaveCriticalSection(&pool.lock);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (queue)  {
 | 
					    if (queue)  {
 | 
				
			||||||
      int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine;
 | 
					      int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      if (pool.queue) SetEvent(pool.filled);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      sa = queue -> sa;
 | 
					      sa = queue -> sa;
 | 
				
			||||||
      sb = queue -> sb;
 | 
					      sb = queue -> sb;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -332,13 +339,8 @@ static DWORD WINAPI blas_thread_server(void *arg){
 | 
				
			||||||
    fprintf(STDERR, "Server[%2ld] Finished!\n", cpu);
 | 
					    fprintf(STDERR, "Server[%2ld] Finished!\n", cpu);
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    EnterCriticalSection(&queue->lock);
 | 
						// mark our sub-task as complete
 | 
				
			||||||
 | 
						InterlockedDecrement(&queue->status);
 | 
				
			||||||
    queue -> status = BLAS_STATUS_FINISHED;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    LeaveCriticalSection(&queue->lock);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    SetEvent(queue->finish);
 | 
					 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /* Shutdown procedure */
 | 
					  /* Shutdown procedure */
 | 
				
			||||||
| 
						 | 
					@ -353,7 +355,7 @@ static DWORD WINAPI blas_thread_server(void *arg){
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* Initializing routine */
 | 
					/* Initializing routine */
 | 
				
			||||||
int blas_thread_init(void){
 | 
					  int blas_thread_init(void){
 | 
				
			||||||
  BLASLONG i;
 | 
					  BLASLONG i;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (blas_server_avail || (blas_cpu_number <= 1)) return 0;
 | 
					  if (blas_server_avail || (blas_cpu_number <= 1)) return 0;
 | 
				
			||||||
| 
						 | 
					@ -367,9 +369,7 @@ int blas_thread_init(void){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (!blas_server_avail){
 | 
					  if (!blas_server_avail){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    InitializeCriticalSection(&pool.lock);
 | 
						pool.taskSemaphore = CreateSemaphore(NULL, 0, blas_cpu_number - 1, NULL);
 | 
				
			||||||
    pool.filled = CreateEvent(NULL, FALSE, FALSE, NULL);
 | 
					 | 
				
			||||||
    pool.killed = CreateEvent(NULL, TRUE,  FALSE, NULL);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pool.shutdown = 0;
 | 
					    pool.shutdown = 0;
 | 
				
			||||||
    pool.queue    = NULL;
 | 
					    pool.queue    = NULL;
 | 
				
			||||||
| 
						 | 
					@ -391,11 +391,10 @@ int blas_thread_init(void){
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
   User can call one of two routines.
 | 
					   User can call one of two routines.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
     exec_blas_async ... immediately returns after jobs are queued.
 | 
						 exec_blas_async ... immediately returns after jobs are queued.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
     exec_blas       ... returns after jobs are finished.
 | 
						 exec_blas       ... returns after jobs are finished.
 | 
				
			||||||
*/
 | 
					*/
 | 
				
			||||||
 | 
					 | 
				
			||||||
int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
 | 
					int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#if defined(SMP_SERVER)
 | 
					#if defined(SMP_SERVER)
 | 
				
			||||||
| 
						 | 
					@ -409,8 +408,7 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
 | 
				
			||||||
  current = queue;
 | 
					  current = queue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  while (current) {
 | 
					  while (current) {
 | 
				
			||||||
    InitializeCriticalSection(¤t -> lock);
 | 
						current->status = 1;
 | 
				
			||||||
    current -> finish = CreateEvent(NULL, FALSE, FALSE, NULL);
 | 
					 | 
				
			||||||
    current -> position = pos;
 | 
					    current -> position = pos;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#ifdef CONSISTENT_FPCSR
 | 
					#ifdef CONSISTENT_FPCSR
 | 
				
			||||||
| 
						 | 
					@ -422,19 +420,10 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
 | 
				
			||||||
    pos ++;
 | 
					    pos ++;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  EnterCriticalSection(&pool.lock);
 | 
					  pool.queue = queue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (pool.queue) {
 | 
					  // start up worker threads
 | 
				
			||||||
    current = pool.queue;
 | 
					  ReleaseSemaphore(pool.taskSemaphore, pos - 1, NULL);
 | 
				
			||||||
    while (current -> next) current = current -> next;
 | 
					 | 
				
			||||||
    current -> next = queue;
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    pool.queue = queue;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  LeaveCriticalSection(&pool.lock);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  SetEvent(pool.filled);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  return 0;
 | 
					  return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -450,10 +439,9 @@ int exec_blas_async_wait(BLASLONG num, blas_queue_t *queue){
 | 
				
			||||||
    fprintf(STDERR, "Waiting Queue ..\n");
 | 
					    fprintf(STDERR, "Waiting Queue ..\n");
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      WaitForSingleObject(queue->finish, INFINITE);
 | 
						// spin-wait on each sub-task to finish
 | 
				
			||||||
 | 
						while (*((volatile int*)&queue->status))
 | 
				
			||||||
      CloseHandle(queue->finish);
 | 
							YIELDING;
 | 
				
			||||||
      DeleteCriticalSection(&queue -> lock);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
      queue = queue -> next;
 | 
					      queue = queue -> next;
 | 
				
			||||||
      num --;
 | 
					      num --;
 | 
				
			||||||
| 
						 | 
					@ -501,18 +489,21 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* Shutdown procedure, but user don't have to call this routine. The */
 | 
					/* Shutdown procedure, but user don't have to call this routine. The */
 | 
				
			||||||
/* kernel automatically kill threads.                                */
 | 
					/* kernel automatically kill threads.                                */
 | 
				
			||||||
 | 
					 | 
				
			||||||
int BLASFUNC(blas_thread_shutdown)(void){
 | 
					int BLASFUNC(blas_thread_shutdown)(void){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  int i;
 | 
					  int i;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#ifdef SMP_DEBUG
 | 
				
			||||||
 | 
					  fprintf(STDERR, "blas_thread_shutdown..\n");
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (!blas_server_avail) return 0;
 | 
					  if (!blas_server_avail) return 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  LOCK_COMMAND(&server_lock);
 | 
					  LOCK_COMMAND(&server_lock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (blas_server_avail){
 | 
					  if (blas_server_avail){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    SetEvent(pool.killed);
 | 
						  pool.shutdown = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for(i = 0; i < blas_num_threads - 1; i++){
 | 
					    for(i = 0; i < blas_num_threads - 1; i++){
 | 
				
			||||||
      // Could also just use WaitForMultipleObjects
 | 
					      // Could also just use WaitForMultipleObjects
 | 
				
			||||||
| 
						 | 
					@ -528,8 +519,7 @@ int BLASFUNC(blas_thread_shutdown)(void){
 | 
				
			||||||
      CloseHandle(blas_threads[i]);
 | 
					      CloseHandle(blas_threads[i]);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    CloseHandle(pool.filled);
 | 
						CloseHandle(pool.taskSemaphore);
 | 
				
			||||||
    CloseHandle(pool.killed);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    blas_server_avail = 0;
 | 
					    blas_server_avail = 0;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
| 
						 | 
					@ -559,16 +549,14 @@ void goto_set_num_threads(int num_threads)
 | 
				
			||||||
		//increased_threads = 1;
 | 
							//increased_threads = 1;
 | 
				
			||||||
	    if (!blas_server_avail){
 | 
						    if (!blas_server_avail){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			InitializeCriticalSection(&pool.lock);
 | 
								pool.taskSemaphore = CreateSemaphore(NULL, 0, blas_cpu_number - 1, NULL);
 | 
				
			||||||
			pool.filled = CreateEvent(NULL, FALSE, FALSE, NULL);
 | 
					 | 
				
			||||||
			pool.killed = CreateEvent(NULL, TRUE,  FALSE, NULL);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
			pool.shutdown = 0;
 | 
								pool.shutdown = 0;
 | 
				
			||||||
			pool.queue    = NULL;
 | 
								pool.queue    = NULL;
 | 
				
			||||||
			blas_server_avail = 1;
 | 
								blas_server_avail = 1;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for(i = blas_num_threads - 1; i < num_threads - 1; i++){
 | 
							for(i = blas_num_threads; i < num_threads - 1; i++){
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			blas_threads[i] = CreateThread(NULL, 0,
 | 
								blas_threads[i] = CreateThread(NULL, 0,
 | 
				
			||||||
				     blas_thread_server, (void *)i,
 | 
									     blas_thread_server, (void *)i,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue