From 5fca9015b898e4f9fb48e8e474d7163aa7bc9053 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 08:23:10 +0000 Subject: [PATCH 1/5] refact worker util --- include/util/tworker.h | 77 +++++++++------- source/dnode/mgmt/impl/inc/dndEnv.h | 12 +-- source/dnode/mgmt/impl/src/dndVnodes.c | 40 ++++----- source/dnode/mgmt/impl/src/dndWorker.c | 20 ++--- source/util/src/tworker.c | 116 +++++++++++++------------ 5 files changed, 144 insertions(+), 121 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 27f03bd2b6..6c81565a6a 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -15,57 +15,74 @@ #ifndef _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H - +#include "os.h" #include "tqueue.h" #ifdef __cplusplus extern "C" { #endif -typedef struct SWorkerPool SWorkerPool; -typedef struct SMWorkerPool SMWorkerPool; +typedef struct SQWorkerPool SQWorkerPool; +typedef struct SFWorkerPool SFWorkerPool; +typedef struct SWWorkerPool SWWorkerPool; -typedef struct SWorker { - int32_t id; // worker ID - pthread_t thread; // thread - SWorkerPool *pool; -} SWorker; +typedef struct SQWorker { + int32_t id; // worker ID + pthread_t thread; // thread + SQWorkerPool *pool; +} SQWorker; -typedef struct SWorkerPool { +typedef struct SQWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - STaosQset *qset; - const char *name; - SWorker *workers; + STaosQset * qset; + const char * name; + SQWorker * workers; pthread_mutex_t mutex; -} SWorkerPool; +} SQWorkerPool; -typedef struct SMWorker { +typedef struct SFWorker { + int32_t id; // worker ID + pthread_t thread; // thread + SFWorkerPool *pool; +} SFWorker; + +typedef struct SFWorkerPool { + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + STaosQset * qset; + const char * name; + SFWorker * workers; + pthread_mutex_t mutex; +} SFWorkerPool; + +typedef struct SWWorker { int32_t id; // worker id pthread_t thread; // thread - STaosQall *qall; - STaosQset *qset; // queue set - SMWorkerPool *pool; -} SMWorker; + STaosQall * qall; + STaosQset * qset; // queue set + SWWorkerPool *pool; +} SWWorker; -typedef struct SMWorkerPool { +typedef struct SWWorkerPool { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic - const char *name; - SMWorker *workers; + const char * name; + SWWorker * workers; pthread_mutex_t mutex; -} SMWorkerPool; +} SWWorkerPool; -int32_t tWorkerInit(SWorkerPool *pool); -void tWorkerCleanup(SWorkerPool *pool); -STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); -void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); +int32_t tQWorkerInit(SQWorkerPool *pool); +void tQWorkerCleanup(SQWorkerPool *pool); +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp); +void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); -int32_t tMWorkerInit(SMWorkerPool *pool); -void tMWorkerCleanup(SMWorkerPool *pool); -STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); -void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); +int32_t tWWorkerInit(SWWorkerPool *pool); +void tWWorkerCleanup(SWWorkerPool *pool); +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp); +void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 7ab3f46fdb..25c04ff6f8 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -31,8 +31,8 @@ typedef struct { SDnode *pDnode; STaosQueue *queue; union { - SWorkerPool pool; - SMWorkerPool mpool; + SQWorkerPool pool; + SWWorkerPool mpool; }; } SDnodeWorker; @@ -109,10 +109,10 @@ typedef struct { int32_t openVnodes; int32_t totalVnodes; SRWLatch latch; - SWorkerPool queryPool; - SWorkerPool fetchPool; - SMWorkerPool syncPool; - SMWorkerPool writePool; + SQWorkerPool queryPool; + SQWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index f487859fd6..8d6fe3707b 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1); int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1); - SWorkerPool *pPool = &pMgmt->queryPool; + SQWorkerPool *pPool = &pMgmt->queryPool; pPool->name = "vnode-query"; pPool->min = minQueryThreads; pPool->max = maxQueryThreads; - if (tWorkerInit(pPool) != 0) return -1; + if (tQWorkerInit(pPool) != 0) return -1; pPool = &pMgmt->fetchPool; pPool->name = "vnode-fetch"; pPool->min = minFetchThreads; pPool->max = maxFetchThreads; - if (tWorkerInit(pPool) != 0) return -1; + if (tQWorkerInit(pPool) != 0) return -1; - SMWorkerPool *pMPool = &pMgmt->writePool; + SWWorkerPool *pMPool = &pMgmt->writePool; pMPool->name = "vnode-write"; pMPool->max = maxWriteThreads; - if (tMWorkerInit(pMPool) != 0) return -1; + if (tWWorkerInit(pMPool) != 0) return -1; pMPool = &pMgmt->syncPool; pMPool->name = "vnode-sync"; pMPool->max = maxSyncThreads; - if (tMWorkerInit(pMPool) != 0) return -1; + if (tWWorkerInit(pMPool) != 0) return -1; dDebug("vnode workers is initialized"); return 0; @@ -938,21 +938,21 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerCleanup(&pMgmt->fetchPool); - tWorkerCleanup(&pMgmt->queryPool); - tMWorkerCleanup(&pMgmt->writePool); - tMWorkerCleanup(&pMgmt->syncPool); + tQWorkerCleanup(&pMgmt->fetchPool); + tQWorkerCleanup(&pMgmt->queryPool); + tWWorkerCleanup(&pMgmt->writePool); + tWWorkerCleanup(&pMgmt->syncPool); dDebug("vnode workers is closed"); } static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || pVnode->pQueryQ == NULL) { @@ -965,11 +965,11 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); - tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pWriteQ = NULL; pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index e0db262f89..42ef76bb43 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -31,28 +31,28 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c pWorker->pDnode = pDnode; if (pWorker->type == DND_WORKER_SINGLE) { - SWorkerPool *pPool = &pWorker->pool; + SQWorkerPool *pPool = &pWorker->pool; pPool->name = name; pPool->min = minNum; pPool->max = maxNum; - if (tWorkerInit(pPool) != 0) { + if (tQWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } } else if (pWorker->type == DND_WORKER_MULTI) { - SMWorkerPool *pPool = &pWorker->mpool; + SWWorkerPool *pPool = &pWorker->mpool; pPool->name = name; pPool->max = maxNum; - if (tMWorkerInit(pPool) != 0) { + if (tWWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); + pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -70,11 +70,11 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { } if (pWorker->type == DND_WORKER_SINGLE) { - tWorkerCleanup(&pWorker->pool); - tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + tQWorkerCleanup(&pWorker->pool); + tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); } else if (pWorker->type == DND_WORKER_MULTI) { - tMWorkerCleanup(&pWorker->mpool); - tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); + tWWorkerCleanup(&pWorker->mpool); + tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue); } else { } } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index ed74041712..3fe087a20d 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -14,38 +14,39 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "ulog.h" -#include "tqueue.h" #include "tworker.h" +#include "ulog.h" -typedef void* (*ThreadFp)(void *param); +typedef void *(*ThreadFp)(void *param); -int32_t tWorkerInit(SWorkerPool *pool) { +int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = calloc(sizeof(SWorker), pool->max); - pthread_mutex_init(&pool->mutex, NULL); - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; + pool->workers = calloc(sizeof(SQWorker), pool->max); + if (pthread_mutex_init(&pool->mutex, NULL)) { + return -1; + } + + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; worker->id = i; worker->pool = pool; } - uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } -void tWorkerCleanup(SWorkerPool *pool) { - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; +void tQWorkerCleanup(SQWorkerPool *pool) { + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } - for (int i = 0; i < pool->max; ++i) { - SWorker *worker = pool->workers + i; + for (int32_t i = 0; i < pool->max; ++i) { + SQWorker *worker = pool->workers + i; if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); @@ -56,28 +57,28 @@ void tWorkerCleanup(SWorkerPool *pool) { taosCloseQset(pool->qset); pthread_mutex_destroy(&pool->mutex); - uInfo("worker:%s is closed", pool->name); + uDebug("qworker:%s is closed", pool->name); } -static void *tWorkerThreadFp(SWorker *worker) { - SWorkerPool *pool = worker->pool; - FProcessItem fp = NULL; +static void *tQWorkerThreadFp(SQWorker *worker) { + SQWorkerPool *pool = worker->pool; + FProcessItem fp = NULL; - void *msg = NULL; - void *ahandle = NULL; + void * msg = NULL; + void * ahandle = NULL; int32_t code = 0; taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uDebug("qworker:%s:%d is running", pool->name, worker->id); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } - if (fp) { + if (fp != NULL) { (*fp)(ahandle, msg); } } @@ -85,7 +86,7 @@ static void *tWorkerThreadFp(SWorker *worker) { return NULL; } -STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) { pthread_mutex_lock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -99,61 +100,66 @@ STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) // spawn a thread to process queue if (pool->num < pool->max) { do { - SWorker *worker = pool->workers + pool->num; + SQWorker *worker = pool->workers + pool->num; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) { - uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { + uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + taosCloseQueue(queue); + queue = NULL; + break; } pthread_attr_destroy(&thAttr); pool->num++; - uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); + uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } pthread_mutex_unlock(&pool->mutex); - uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { +void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uDebug("qworker:%s, queue:%p is freed", pool->name, queue); } -int32_t tMWorkerInit(SMWorkerPool *pool) { +int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SMWorker), pool->max); + pool->workers = calloc(sizeof(SWWorker), pool->max); if (pool->workers == NULL) return -1; pthread_mutex_init(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; worker->id = i; worker->qall = NULL; worker->qset = NULL; worker->pool = pool; } - uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); + uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max); return 0; } -void tMWorkerCleanup(SMWorkerPool *pool) { +void tWWorkerCleanup(SWWorkerPool *pool) { for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { - if (worker->qset) taosQsetThreadResume(worker->qset); + if (worker->qset) { + taosQsetThreadResume(worker->qset); + } } } for (int32_t i = 0; i < pool->max; ++i) { - SMWorker *worker = pool->workers + i; + SWWorker *worker = pool->workers + i; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); taosFreeQall(worker->qall); @@ -164,30 +170,30 @@ void tMWorkerCleanup(SMWorkerPool *pool) { tfree(pool->workers); pthread_mutex_destroy(&pool->mutex); - uInfo("worker:%s is closed", pool->name); + uInfo("wworker:%s is closed", pool->name); } -static void *tWriteWorkerThreadFp(SMWorker *worker) { - SMWorkerPool *pool = worker->pool; +static void *tWriteWorkerThreadFp(SWWorker *worker) { + SWWorkerPool *pool = worker->pool; FProcessItems fp = NULL; - void *msg = NULL; - void *ahandle = NULL; + void * msg = NULL; + void * ahandle = NULL; int32_t numOfMsgs = 0; int32_t qtype = 0; taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); + uDebug("wworker:%s:%d is running", pool->name, worker->id); while (1) { numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); if (numOfMsgs == 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } - if (fp) { + if (fp != NULL) { (*fp)(ahandle, worker->qall, numOfMsgs); } } @@ -195,9 +201,9 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { return NULL; } -STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) { pthread_mutex_lock(&pool->mutex); - SMWorker *worker = pool->workers + pool->nextId; + SWWorker *worker = pool->workers + pool->nextId; STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -228,13 +234,13 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { - uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); queue = NULL; } else { - uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); pool->nextId = (pool->nextId + 1) % pool->max; } @@ -245,12 +251,12 @@ STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems } pthread_mutex_unlock(&pool->mutex); - uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } -void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { +void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("worker:%s, queue:%p is freed", pool->name, queue); + uDebug("wworker:%s, queue:%p is freed", pool->name, queue); } From 8685fb79b7df3ee04be464aeadf825bdc43b1b6a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 08:40:07 +0000 Subject: [PATCH 2/5] refact worker --- include/util/tqueue.h | 1 + include/util/tworker.h | 1 - source/util/src/tqueue.c | 59 ++++++++++++++++++++++----------------- source/util/src/tworker.c | 23 +++++++++++++-- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 63ba460d39..a7a8b54439 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -15,6 +15,7 @@ #ifndef _TD_UTIL_QUEUE_H #define _TD_UTIL_QUEUE_H +#include "os.h" #ifdef __cplusplus extern "C" { diff --git a/include/util/tworker.h b/include/util/tworker.h index 6c81565a6a..bf002201b3 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -15,7 +15,6 @@ #ifndef _TD_UTIL_WORKER_H #define _TD_UTIL_WORKER_H -#include "os.h" #include "tqueue.h" #ifdef __cplusplus diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 5cb149d53c..66c1332db9 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -13,10 +13,9 @@ * along with this program. If not, see . */ -#include "os.h" - -#include "taoserror.h" +#define _DEFAULT_SOURCE #include "tqueue.h" +#include "taoserror.h" #include "ulog.h" typedef struct STaosQnode STaosQnode; @@ -29,19 +28,19 @@ typedef struct STaosQnode { typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; - STaosQnode *head; - STaosQnode *tail; - STaosQueue *next; // for queue set - STaosQset *qset; // for queue set - void *ahandle; // for queue set + STaosQnode * head; + STaosQnode * tail; + STaosQueue * next; // for queue set + STaosQset * qset; // for queue set + void * ahandle; // for queue set FProcessItem itemFp; FProcessItems itemsFp; pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { - STaosQueue *head; - STaosQueue *current; + STaosQueue * head; + STaosQueue * current; pthread_mutex_t mutex; int32_t numOfQueues; int32_t numOfItems; @@ -56,15 +55,18 @@ typedef struct STaosQall { } STaosQall; STaosQueue *taosOpenQueue() { - STaosQueue *queue = calloc(sizeof(STaosQueue), 1); + STaosQueue *queue = calloc(1, sizeof(STaosQueue)); if (queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pthread_mutex_init(&queue->mutex, NULL); + if (pthread_mutex_init(&queue->mutex, NULL) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - uTrace("queue:%p is opened", queue); + uDebug("queue:%p is opened", queue); return queue; } @@ -77,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsF void taosCloseQueue(STaosQueue *queue) { if (queue == NULL) return; STaosQnode *pTemp; - STaosQset *qset; + STaosQset * qset; pthread_mutex_lock(&queue->mutex); STaosQnode *pNode = queue->head; @@ -85,7 +87,9 @@ void taosCloseQueue(STaosQueue *queue) { qset = queue->qset; pthread_mutex_unlock(&queue->mutex); - if (queue->qset) taosRemoveFromQset(qset, queue); + if (queue->qset) { + taosRemoveFromQset(qset, queue); + } while (pNode) { pTemp = pNode; @@ -96,7 +100,7 @@ void taosCloseQueue(STaosQueue *queue) { pthread_mutex_destroy(&queue->mutex); free(queue); - uTrace("queue:%p is closed", queue); + uDebug("queue:%p is closed", queue); } bool taosQueueEmpty(STaosQueue *queue) { @@ -120,9 +124,13 @@ int32_t taosQueueSize(STaosQueue *queue) { } void *taosAllocateQitem(int32_t size) { - STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); + STaosQnode *pNode = calloc(1, sizeof(STaosQnode) + size); + + if (pNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - if (pNode == NULL) return NULL; uTrace("item:%p, node:%p is allocated", pNode->item, pNode); return (void *)pNode->item; } @@ -130,7 +138,7 @@ void *taosAllocateQitem(int32_t size) { void taosFreeQitem(void *param) { if (param == NULL) return; - char *temp = (char *)param; + char *temp = param; temp -= sizeof(STaosQnode); uTrace("item:%p, node:%p is freed", param, temp); free(temp); @@ -175,7 +183,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -183,7 +191,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { return code; } -STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } +STaosQall *taosAllocateQall() { return calloc(1, sizeof(STaosQall)); } void taosFreeQall(STaosQall *qall) { free(qall); } @@ -238,7 +246,7 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } STaosQset *taosOpenQset() { - STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); + STaosQset *qset = calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -247,7 +255,7 @@ STaosQset *taosOpenQset() { pthread_mutex_init(&qset->mutex, NULL); tsem_init(&qset->sem, 0, 0); - uTrace("qset:%p is opened", qset); + uDebug("qset:%p is opened", qset); return qset; } @@ -268,7 +276,7 @@ void taosCloseQset(STaosQset *qset) { pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); free(qset); - uTrace("qset:%p is closed", qset); + uDebug("qset:%p is closed", qset); } // tsem_post 'qset->sem', so that reader threads waiting for it @@ -338,7 +346,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { pthread_mutex_unlock(&qset->mutex); - uTrace("queue:%p is removed from qset:%p", queue, qset); + uDebug("queue:%p is removed from qset:%p", queue, qset); } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } @@ -365,6 +373,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; + queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 3fe087a20d..fc0b6afb3b 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "tworker.h" +#include "taoserror.h" #include "ulog.h" typedef void *(*ThreadFp)(void *param); @@ -22,7 +23,13 @@ typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); pool->workers = calloc(sizeof(SQWorker), pool->max); + if (pool->workers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (pthread_mutex_init(&pool->mutex, NULL)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -91,6 +98,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -109,6 +117,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; break; } @@ -133,9 +142,16 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(sizeof(SWWorker), pool->max); - if (pool->workers == NULL) return -1; + if (pool->workers == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (pthread_mutex_init(&pool->mutex, NULL) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - pthread_mutex_init(&pool->mutex, NULL); for (int32_t i = 0; i < pool->max; ++i) { SWWorker *worker = pool->workers + i; worker->id = i; @@ -208,6 +224,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -227,6 +244,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems taosCloseQset(worker->qset); taosCloseQueue(queue); pthread_mutex_unlock(&pool->mutex); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pthread_attr_t thAttr; @@ -238,6 +256,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; } else { uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); From 4fc74ced9494cab0fe8d86e4c373ada307d0bc34 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 09:14:24 +0000 Subject: [PATCH 3/5] refact worker --- include/util/tqueue.h | 15 +++-- include/util/tworker.h | 30 +++------ source/dnode/mgmt/impl/src/dndVnodes.c | 14 ++--- source/dnode/mgmt/impl/src/dndWorker.c | 4 +- source/util/src/tqueue.c | 74 ++++++++++++++++++++--- source/util/src/tworker.c | 84 +++++++++++++++++++------- 6 files changed, 155 insertions(+), 66 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index a7a8b54439..cfa5a65c2a 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -41,13 +41,13 @@ shall be used to set up the protection. typedef struct STaosQueue STaosQueue; typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; -typedef void (*FProcessItem)(void *ahandle, void *pItem); -typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); +typedef void (*FItem)(void *ahandle, void *pItem); +typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); -void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); -void *taosAllocateQitem(int32_t size); +void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); +void * taosAllocateQitem(int32_t size); void taosFreeQitem(void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); @@ -67,8 +67,11 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); int32_t taosGetQueueNumber(STaosQset *qset); -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); + +int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId); +void taosResetQsetThread(STaosQset *qset, void *pItem); int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQsetItemsNumber(STaosQset *qset); diff --git a/include/util/tworker.h b/include/util/tworker.h index bf002201b3..771c7c9433 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -22,14 +22,13 @@ extern "C" { #endif typedef struct SQWorkerPool SQWorkerPool; -typedef struct SFWorkerPool SFWorkerPool; typedef struct SWWorkerPool SWWorkerPool; typedef struct SQWorker { int32_t id; // worker ID pthread_t thread; // thread SQWorkerPool *pool; -} SQWorker; +} SQWorker, SFWorker; typedef struct SQWorkerPool { int32_t max; // max number of workers @@ -39,23 +38,7 @@ typedef struct SQWorkerPool { const char * name; SQWorker * workers; pthread_mutex_t mutex; -} SQWorkerPool; - -typedef struct SFWorker { - int32_t id; // worker ID - pthread_t thread; // thread - SFWorkerPool *pool; -} SFWorker; - -typedef struct SFWorkerPool { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - STaosQset * qset; - const char * name; - SFWorker * workers; - pthread_mutex_t mutex; -} SFWorkerPool; +} SQWorkerPool, SFWorkerPool; typedef struct SWWorker { int32_t id; // worker id @@ -75,12 +58,17 @@ typedef struct SWWorkerPool { int32_t tQWorkerInit(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool); -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp); +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); +int32_t tFWorkerInit(SFWorkerPool *pool); +void tFWorkerCleanup(SFWorkerPool *pool); +STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp); +void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue); + int32_t tWWorkerInit(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool); -STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp); +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 8d6fe3707b..9b9269eb9c 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -253,7 +253,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + cJSON * vnode = cJSON_GetArrayItem(vnodes, i); SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); @@ -382,7 +382,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dndReportStartup(pDnode, "open-vnodes", stepDesc); SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId}; - SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); + SVnode * pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -948,11 +948,11 @@ static void dndCleanupVnodeWorkers(SDnode *pDnode) { static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || pVnode->pQueryQ == NULL) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 42ef76bb43..5ccf6640c0 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); + pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 66c1332db9..b50ad77b6f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -22,19 +22,21 @@ typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { STaosQnode *next; + STaosQueue *queue; char item[]; } STaosQnode; typedef struct STaosQueue { int32_t itemSize; int32_t numOfItems; + int32_t threadId; STaosQnode * head; STaosQnode * tail; STaosQueue * next; // for queue set STaosQset * qset; // for queue set void * ahandle; // for queue set - FProcessItem itemFp; - FProcessItems itemsFp; + FItem itemFp; + FItems itemsFp; pthread_mutex_t mutex; } STaosQueue; @@ -66,11 +68,12 @@ STaosQueue *taosOpenQueue() { return NULL; } + queue->threadId = -1; uDebug("queue:%p is opened", queue); return queue; } -void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { +void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) { if (queue == NULL) return; queue->itemFp = itemFp; queue->itemsFp = itemsFp; @@ -135,12 +138,12 @@ void *taosAllocateQitem(int32_t size) { return (void *)pNode->item; } -void taosFreeQitem(void *param) { - if (param == NULL) return; +void taosFreeQitem(void *pItem) { + if (pItem == NULL) return; - char *temp = param; + char *temp = pItem; temp -= sizeof(STaosQnode); - uTrace("item:%p, node:%p is freed", param, temp); + uTrace("item:%p, node:%p is freed", pItem, temp); free(temp); } @@ -351,7 +354,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) { STaosQnode *pNode = NULL; int32_t code = 0; @@ -391,7 +394,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FP return code; } -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) { STaosQueue *queue; int32_t code = 0; @@ -432,6 +435,59 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand return code; } +int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { + STaosQnode *pNode = NULL; + int32_t code = 0; + + tsem_wait(&qset->sem); + + pthread_mutex_lock(&qset->mutex); + + for (int32_t i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; + STaosQueue *queue = qset->current; + if (queue) qset->current = queue->next; + if (queue == NULL) break; + if (queue->head == NULL) continue; + if (queue->threadId != -1 && queue->threadId != threadId) continue; + + pthread_mutex_lock(&queue->mutex); + + if (queue->head) { + pNode = queue->head; + pNode->queue = queue; + queue->threadId = threadId; + *ppItem = pNode->item; + + if (ahandle) *ahandle = queue->ahandle; + if (itemFp) *itemFp = queue->itemFp; + + queue->head = pNode->next; + if (queue->head == NULL) queue->tail = NULL; + queue->numOfItems--; + atomic_sub_fetch_32(&qset->numOfItems, 1); + code = 1; + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); + } + + pthread_mutex_unlock(&queue->mutex); + if (pNode) break; + } + + pthread_mutex_unlock(&qset->mutex); + + return code; +} + +void taosResetQsetThread(STaosQset *qset, void *pItem) { + if (pItem == NULL) return; + STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); + + pthread_mutex_lock(&qset->mutex); + pNode->queue->threadId = -1; + pthread_mutex_unlock(&qset->mutex); +} + int32_t taosGetQueueItemsNumber(STaosQueue *queue) { if (!queue) return 0; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index fc0b6afb3b..bc98a371fc 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -39,7 +39,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { worker->pool = pool; } - uDebug("qworker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); + uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); return 0; } @@ -64,12 +64,12 @@ void tQWorkerCleanup(SQWorkerPool *pool) { taosCloseQset(pool->qset); pthread_mutex_destroy(&pool->mutex); - uDebug("qworker:%s is closed", pool->name); + uDebug("worker:%s is closed", pool->name); } static void *tQWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FProcessItem fp = NULL; + FItem fp = NULL; void * msg = NULL; void * ahandle = NULL; @@ -77,11 +77,11 @@ static void *tQWorkerThreadFp(SQWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("qworker:%s:%d is running", pool->name, worker->id); + uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ahandle, &fp) == 0) { - uDebug("qworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } @@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { return NULL; } -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) { pthread_mutex_lock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -114,8 +114,8 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { - uError("qworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + if (pthread_create(&worker->thread, &thAttr, threadFp, worker) != 0) { + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; @@ -124,21 +124,63 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FProcessItem f pthread_attr_destroy(&thAttr); pool->num++; - uDebug("qworker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); + uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } pthread_mutex_unlock(&pool->mutex); - uDebug("qworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { + return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); +} + void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("qworker:%s, queue:%p is freed", pool->name, queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); } +int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); } + +void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } + +static void *tFWorkerThreadFp(SQWorker *worker) { + SQWorkerPool *pool = worker->pool; + FItem fp = NULL; + + void * msg = NULL; + void * ahandle = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + uDebug("worker:%s:%d is running", pool->name, worker->id); + + while (1) { + if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) { + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + break; + } + + if (fp != NULL) { + (*fp)(ahandle, msg); + } + + taosResetQsetThread(pool->qset, msg); + } + + return NULL; +} + +STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { + return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tFWorkerThreadFp); +} + +void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); } + int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(sizeof(SWWorker), pool->max); @@ -160,7 +202,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) { worker->pool = pool; } - uInfo("wworker:%s is initialized, max:%d", pool->name, pool->max); + uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); return 0; } @@ -186,12 +228,12 @@ void tWWorkerCleanup(SWWorkerPool *pool) { tfree(pool->workers); pthread_mutex_destroy(&pool->mutex); - uInfo("wworker:%s is closed", pool->name); + uInfo("worker:%s is closed", pool->name); } static void *tWriteWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; - FProcessItems fp = NULL; + FItems fp = NULL; void * msg = NULL; void * ahandle = NULL; @@ -200,12 +242,12 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); - uDebug("wworker:%s:%d is running", pool->name, worker->id); + uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); if (numOfMsgs == 0) { - uDebug("wworker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } @@ -217,7 +259,7 @@ static void *tWriteWorkerThreadFp(SWWorker *worker) { return NULL; } -STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pthread_mutex_lock(&pool->mutex); SWWorker *worker = pool->workers + pool->nextId; @@ -252,14 +294,14 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { - uError("wworker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; queue = NULL; } else { - uDebug("wworker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); pool->nextId = (pool->nextId + 1) % pool->max; } @@ -270,12 +312,12 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FProcessItems } pthread_mutex_unlock(&pool->mutex); - uDebug("wworker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); return queue; } void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); - uDebug("wworker:%s, queue:%p is freed", pool->name, queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); } From bb5eef2dcfbe2f1c8ae8fab7e2eba90a6ed0797a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 09:20:24 +0000 Subject: [PATCH 4/5] refact worker --- source/dnode/mgmt/impl/inc/dndEnv.h | 2 +- source/dnode/mgmt/impl/src/dndVnodes.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 25c04ff6f8..4214dca11d 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -110,7 +110,7 @@ typedef struct { int32_t totalVnodes; SRWLatch latch; SQWorkerPool queryPool; - SQWorkerPool fetchPool; + SFWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; } SVnodesMgmt; diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 9b9269eb9c..473e6c33ef 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -920,7 +920,7 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { pPool->name = "vnode-fetch"; pPool->min = minFetchThreads; pPool->max = maxFetchThreads; - if (tQWorkerInit(pPool) != 0) return -1; + if (tFWorkerInit(pPool) != 0) return -1; SWWorkerPool *pMPool = &pMgmt->writePool; pMPool->name = "vnode-write"; @@ -938,7 +938,7 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tQWorkerCleanup(&pMgmt->fetchPool); + tFWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); @@ -951,7 +951,7 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue); + pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || @@ -966,7 +966,7 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); From 07978790225f62672004c18e8052bdbee154778e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 25 Jan 2022 10:29:47 +0000 Subject: [PATCH 5/5] dead lock after refact worker --- source/dnode/mgmt/impl/src/dndVnodes.c | 36 +++++++++++++------------- source/util/src/tqueue.c | 14 +++++++--- source/util/src/tworker.c | 13 +++++++--- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 473e6c33ef..dcb73d13c7 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -910,27 +910,27 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) { int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1); int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1); - SQWorkerPool *pPool = &pMgmt->queryPool; - pPool->name = "vnode-query"; - pPool->min = minQueryThreads; - pPool->max = maxQueryThreads; - if (tQWorkerInit(pPool) != 0) return -1; + SQWorkerPool *pQPool = &pMgmt->queryPool; + pQPool->name = "vnode-query"; + pQPool->min = minQueryThreads; + pQPool->max = maxQueryThreads; + if (tQWorkerInit(pQPool) != 0) return -1; - pPool = &pMgmt->fetchPool; - pPool->name = "vnode-fetch"; - pPool->min = minFetchThreads; - pPool->max = maxFetchThreads; - if (tFWorkerInit(pPool) != 0) return -1; + SFWorkerPool *pFPool = &pMgmt->fetchPool; + pFPool->name = "vnode-fetch"; + pFPool->min = minFetchThreads; + pFPool->max = maxFetchThreads; + if (tFWorkerInit(pFPool) != 0) return -1; - SWWorkerPool *pMPool = &pMgmt->writePool; - pMPool->name = "vnode-write"; - pMPool->max = maxWriteThreads; - if (tWWorkerInit(pMPool) != 0) return -1; + SWWorkerPool *pWPool = &pMgmt->writePool; + pWPool->name = "vnode-write"; + pWPool->max = maxWriteThreads; + if (tWWorkerInit(pWPool) != 0) return -1; - pMPool = &pMgmt->syncPool; - pMPool->name = "vnode-sync"; - pMPool->max = maxSyncThreads; - if (tWWorkerInit(pMPool) != 0) return -1; + pWPool = &pMgmt->syncPool; + pWPool->name = "vnode-sync"; + pWPool->max = maxSyncThreads; + if (tWWorkerInit(pWPool) != 0) return -1; dDebug("vnode workers is initialized"); return 0; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index b50ad77b6f..8125f550d0 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -423,7 +423,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); + for (int32_t j = 1; j < qall->numOfItems; ++j) { + tsem_wait(&qset->sem); + } } pthread_mutex_unlock(&queue->mutex); @@ -437,7 +439,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { STaosQnode *pNode = NULL; - int32_t code = 0; + int32_t code = -1; tsem_wait(&qset->sem); @@ -449,7 +451,10 @@ int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **aha if (queue) qset->current = queue->next; if (queue == NULL) break; if (queue->head == NULL) continue; - if (queue->threadId != -1 && queue->threadId != threadId) continue; + if (queue->threadId != -1 && queue->threadId != threadId) { + code = 0; + continue; + } pthread_mutex_lock(&queue->mutex); @@ -485,6 +490,9 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { pthread_mutex_lock(&qset->mutex); pNode->queue->threadId = -1; + for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { + tsem_post(&qset->sem); + } pthread_mutex_unlock(&qset->mutex); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index bc98a371fc..97440f8dae 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -149,8 +149,8 @@ void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } static void *tFWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FItem fp = NULL; + FItem fp = NULL; void * msg = NULL; void * ahandle = NULL; int32_t code = 0; @@ -160,9 +160,14 @@ static void *tFWorkerThreadFp(SQWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id) == 0) { + code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id); + + if (code < 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; + } else if (code == 0) { + // uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset); + continue; } if (fp != NULL) { @@ -231,7 +236,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -static void *tWriteWorkerThreadFp(SWWorker *worker) { +static void *tWWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; FItems fp = NULL; @@ -293,7 +298,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) { uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosFreeQall(worker->qall); taosCloseQset(worker->qset);