Merge pull request #2466 from AGSaidi/acq-rel-1

Switch blas_server to use acq/rel semantics
This commit is contained in:
Martin Kroeker 2020-03-04 07:59:31 +01:00 committed by GitHub
commit e6edb7431f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 55 deletions

View File

@ -140,6 +140,16 @@ typedef struct {
} thread_status_t; } thread_status_t;
#if (__STDC_VERSION__ >= 201112L)
#define atomic_load_queue(p) __atomic_load_n(p, __ATOMIC_RELAXED)
#define atomic_store_queue(p, v) __atomic_store_n(p, v, __ATOMIC_RELAXED)
#else
#define atomic_load_queue(p) (blas_queue_t*)(*(volatile blas_queue_t**)(p))
#define atomic_store_queue(p, v) (*(volatile blas_queue_t* volatile*)(p) = (v))
#endif
static thread_status_t thread_status[MAX_CPU_NUMBER] __attribute__((aligned(ATTRIBUTE_SIZE))); static thread_status_t thread_status[MAX_CPU_NUMBER] __attribute__((aligned(ATTRIBUTE_SIZE)));
#ifndef THREAD_TIMEOUT #ifndef THREAD_TIMEOUT
@ -312,20 +322,19 @@ blas_queue_t *tscq;
last_tick = (unsigned int)rpcc(); last_tick = (unsigned int)rpcc();
pthread_mutex_lock (&thread_status[cpu].lock); tscq = atomic_load_queue(&thread_status[cpu].queue);
tscq=thread_status[cpu].queue;
pthread_mutex_unlock (&thread_status[cpu].lock);
while(!tscq) { while(!tscq) {
YIELDING; YIELDING;
if ((unsigned int)rpcc() - last_tick > thread_timeout) { if ((unsigned int)rpcc() - last_tick > thread_timeout) {
pthread_mutex_lock (&thread_status[cpu].lock);
if (!thread_status[cpu].queue) { if (!atomic_load_queue(&thread_status[cpu].queue)) {
pthread_mutex_lock (&thread_status[cpu].lock);
thread_status[cpu].status = THREAD_STATUS_SLEEP; thread_status[cpu].status = THREAD_STATUS_SLEEP;
while (thread_status[cpu].status == THREAD_STATUS_SLEEP) { while (thread_status[cpu].status == THREAD_STATUS_SLEEP &&
!atomic_load_queue(&thread_status[cpu].queue)) {
#ifdef MONITOR #ifdef MONITOR
main_status[cpu] = MAIN_SLEEPING; main_status[cpu] = MAIN_SLEEPING;
@ -333,19 +342,18 @@ blas_queue_t *tscq;
pthread_cond_wait(&thread_status[cpu].wakeup, &thread_status[cpu].lock); pthread_cond_wait(&thread_status[cpu].wakeup, &thread_status[cpu].lock);
} }
pthread_mutex_unlock(&thread_status[cpu].lock);
} }
pthread_mutex_unlock(&thread_status[cpu].lock);
last_tick = (unsigned int)rpcc(); last_tick = (unsigned int)rpcc();
} }
pthread_mutex_lock (&thread_status[cpu].lock);
tscq=thread_status[cpu].queue; tscq = atomic_load_queue(&thread_status[cpu].queue);
pthread_mutex_unlock (&thread_status[cpu].lock);
} }
queue = thread_status[cpu].queue; queue = atomic_load_queue(&thread_status[cpu].queue);
MB;
if ((long)queue == -1) break; if ((long)queue == -1) break;
@ -360,9 +368,7 @@ blas_queue_t *tscq;
if (queue) { if (queue) {
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine; int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine;
pthread_mutex_lock (&thread_status[cpu].lock); atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);
thread_status[cpu].queue = (blas_queue_t *)1;
pthread_mutex_unlock (&thread_status[cpu].lock);
sa = queue -> sa; sa = queue -> sa;
sb = queue -> sb; sb = queue -> sb;
@ -442,13 +448,9 @@ blas_queue_t *tscq;
// arm: make sure all results are written out _before_ // arm: make sure all results are written out _before_
// thread is marked as done and other threads use them // thread is marked as done and other threads use them
WMB; MB;
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);
pthread_mutex_lock (&thread_status[cpu].lock);
thread_status[cpu].queue = (blas_queue_t * volatile) ((long)thread_status[cpu].queue & 0); /* Need a trick */
pthread_mutex_unlock (&thread_status[cpu].lock);
WMB;
} }
@ -566,7 +568,7 @@ int blas_thread_init(void){
for(i = 0; i < blas_num_threads - 1; i++){ for(i = 0; i < blas_num_threads - 1; i++){
thread_status[i].queue = (blas_queue_t *)NULL; atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)0);
thread_status[i].status = THREAD_STATUS_WAKEUP; thread_status[i].status = THREAD_STATUS_WAKEUP;
pthread_mutex_init(&thread_status[i].lock, NULL); pthread_mutex_init(&thread_status[i].lock, NULL);
@ -655,7 +657,8 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
if (queue -> mode & BLAS_NODE) { if (queue -> mode & BLAS_NODE) {
do { do {
while((thread_status[i].node != node || thread_status[i].queue) && (i < blas_num_threads - 1)) i ++;
while((thread_status[i].node != node || atomic_load_queue(&thread_status[i].queue)) && (i < blas_num_threads - 1)) i ++;
if (i < blas_num_threads - 1) break; if (i < blas_num_threads - 1) break;
@ -669,36 +672,26 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
} while (1); } while (1);
} else { } else {
pthread_mutex_lock (&thread_status[i].lock); tsiq = atomic_load_queue(&thread_status[i].queue);
tsiq = thread_status[i].queue;
pthread_mutex_unlock (&thread_status[i].lock);
while(tsiq) { while(tsiq) {
i ++; i ++;
if (i >= blas_num_threads - 1) i = 0; if (i >= blas_num_threads - 1) i = 0;
pthread_mutex_lock (&thread_status[i].lock); tsiq = atomic_load_queue(&thread_status[i].queue);
tsiq = thread_status[i].queue;
pthread_mutex_unlock (&thread_status[i].lock);
} }
} }
#else #else
pthread_mutex_lock (&thread_status[i].lock); tsiq = atomic_load_queue(&thread_status[i].queue);
tsiq=thread_status[i].queue ;
pthread_mutex_unlock (&thread_status[i].lock);
while(tsiq) { while(tsiq) {
i ++; i ++;
if (i >= blas_num_threads - 1) i = 0; if (i >= blas_num_threads - 1) i = 0;
pthread_mutex_lock (&thread_status[i].lock); tsiq = atomic_load_queue(&thread_status[i].queue);
tsiq=thread_status[i].queue ;
pthread_mutex_unlock (&thread_status[i].lock);
} }
#endif #endif
queue -> assigned = i; queue -> assigned = i;
WMB; MB;
pthread_mutex_lock (&thread_status[i].lock);
thread_status[i].queue = queue; atomic_store_queue(&thread_status[i].queue, queue);
pthread_mutex_unlock (&thread_status[i].lock);
WMB;
queue = queue -> next; queue = queue -> next;
pos ++; pos ++;
@ -718,9 +711,7 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
pos = current -> assigned; pos = current -> assigned;
pthread_mutex_lock (&thread_status[pos].lock); tspq = atomic_load_queue(&thread_status[pos].queue);
tspq=thread_status[pos].queue;
pthread_mutex_unlock (&thread_status[pos].lock);
if ((BLASULONG)tspq > 1) { if ((BLASULONG)tspq > 1) {
pthread_mutex_lock (&thread_status[pos].lock); pthread_mutex_lock (&thread_status[pos].lock);
@ -752,24 +743,20 @@ int exec_blas_async_wait(BLASLONG num, blas_queue_t *queue){
while ((num > 0) && queue) { while ((num > 0) && queue) {
pthread_mutex_lock(&thread_status[queue->assigned].lock); tsqq = atomic_load_queue(&thread_status[queue->assigned].queue);
tsqq=thread_status[queue -> assigned].queue;
pthread_mutex_unlock(&thread_status[queue->assigned].lock);
while(tsqq) { while(tsqq) {
YIELDING; YIELDING;
pthread_mutex_lock(&thread_status[queue->assigned].lock); tsqq = atomic_load_queue(&thread_status[queue->assigned].queue);
tsqq=thread_status[queue -> assigned].queue;
pthread_mutex_unlock(&thread_status[queue->assigned].lock);
}; };
queue = queue -> next; queue = queue -> next;
num --; num --;
} }
MB;
#ifdef SMP_DEBUG #ifdef SMP_DEBUG
fprintf(STDERR, "Done.\n\n"); fprintf(STDERR, "Done.\n\n");
#endif #endif
@ -880,7 +867,7 @@ void goto_set_num_threads(int num_threads) {
for(i = blas_num_threads - 1; i < num_threads - 1; i++){ for(i = blas_num_threads - 1; i < num_threads - 1; i++){
thread_status[i].queue = (blas_queue_t *)NULL; atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)0);
thread_status[i].status = THREAD_STATUS_WAKEUP; thread_status[i].status = THREAD_STATUS_WAKEUP;
pthread_mutex_init(&thread_status[i].lock, NULL); pthread_mutex_init(&thread_status[i].lock, NULL);
@ -971,12 +958,11 @@ int BLASFUNC(blas_thread_shutdown)(void){
for (i = 0; i < blas_num_threads - 1; i++) { for (i = 0; i < blas_num_threads - 1; i++) {
pthread_mutex_lock (&thread_status[i].lock); pthread_mutex_lock (&thread_status[i].lock);
thread_status[i].queue = (blas_queue_t *)-1; atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)-1);
thread_status[i].status = THREAD_STATUS_WAKEUP; thread_status[i].status = THREAD_STATUS_WAKEUP;
pthread_cond_signal (&thread_status[i].wakeup); pthread_cond_signal (&thread_status[i].wakeup);
pthread_mutex_unlock(&thread_status[i].lock); pthread_mutex_unlock(&thread_status[i].lock);