From 11a98ffab2010ddbb03e810b1a933c0a5f3d4987 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 24 Jun 2024 18:28:56 +0800 Subject: [PATCH 01/15] QueryAutoQWorker --- include/libs/executor/executor.h | 1 + include/util/tlist.h | 3 +- include/util/tqueue.h | 1 + include/util/tworker.h | 55 +++ source/common/src/tglobal.c | 2 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 28 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 12 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/querytask.h | 1 + source/libs/executor/src/exchangeoperator.c | 24 +- source/libs/executor/src/querytask.c | 1 + source/libs/qworker/src/qwMsg.c | 1 + source/util/src/tworker.c | 415 ++++++++++++++++++++ 15 files changed, 522 insertions(+), 29 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 78a977c86f..a461ceef2a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -57,6 +57,7 @@ typedef struct { STimeWindow winRange; struct SStorageAPI api; + void* pWorkerCb; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/include/util/tlist.h b/include/util/tlist.h index 0924c133b9..1d4de3096f 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -218,6 +218,7 @@ typedef struct { #define listEleSize(l) ((l)->eleSize) #define isListEmpty(l) (TD_DLIST_NELES(l) == 0) #define listNodeFree(n) taosMemoryFree(n) +#define listNode(data) (SListNode*)(((void*)(data)) - sizeof(SListNode)) void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); @@ -293,4 +294,4 @@ SListNode *tdListNext(SListIter *pIter); } #endif -#endif /*_TD_UTIL_LIST_H_*/ \ No newline at end of file +#endif /*_TD_UTIL_LIST_H_*/ diff --git a/include/util/tqueue.h b/include/util/tqueue.h index eb887596d0..51a3b2dc7d 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -49,6 +49,7 @@ typedef struct { int32_t workerId; int32_t threadNum; int64_t timestamp; + void *poolCb; } SQueueInfo; typedef enum { diff --git a/include/util/tworker.h b/include/util/tworker.h index f39540d24b..3af6d3d8ba 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -16,6 +16,7 @@ #ifndef _TD_UTIL_WORKER_H_ #define _TD_UTIL_WORKER_H_ +#include "tlist.h" #include "tqueue.h" #include "tarray.h" @@ -115,6 +116,60 @@ void tSingleWorkerCleanup(SSingleWorker *pWorker); int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); void tMultiWorkerCleanup(SMultiWorker *pWorker); +struct SQueryAutoQWorkerPoolCB; + +typedef struct SQueryAutoQWorker { + int32_t id; // worker id + int32_t backupIdx;// the idx when put into backup pool + int64_t pid; // thread pid + TdThread thread; // thread id + void *pool; +} SQueryAutoQWorker; + +typedef struct SQueryAutoQWorkerPool { + int32_t num; + int32_t max; + int32_t min; + int32_t maxRunning; + + int32_t activeN; // running workers and workers waiting at reading new queue msg + int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. + + int32_t blockingN; // blocked worker num, like exchangeoperator sem_wait + + int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers + TdThreadMutex waitingAfterBlockLock; + TdThreadCond waitingAfterBlockCond; + + int32_t waitingBeforeProcessMsgN; // workers that get msg from queue, but waiting for too many running workers + TdThreadMutex waitingBeforeProcessMsgLock; + TdThreadCond waitingBeforeProcessMsgCond; + + int32_t backupNum; // workers that are in backup pool, not reading msg from queue + TdThreadMutex backupLock; + TdThreadCond backupCond; + + const char *name; + TdThreadMutex poolLock; + SList *workers; + SList *backupWorkers; + SList *exitedWorkers; + STaosQset *qset; + struct SQueryAutoQWorkerPoolCB *pCb; + bool exit; +} SQueryAutoQWorkerPool; + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pPool); +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool); +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahandle, FItem fp); +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); + +typedef struct SQueryAutoQWorkerPoolCB { + SQueryAutoQWorkerPool* pPool; + int32_t (*beforeBlocking)(void* pPool); + int32_t (*afterRecoverFromBlocking)(void* pPool); +} SQueryAutoQWorkerPoolCB; + #ifdef __cplusplus } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 07d78b5c0b..996cc3c6f8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -669,7 +669,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 375a34c04f..85712d2797 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,20 +26,20 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SAutoQWorkerPool streamPool; - SWWorkerPool fetchPool; - SSingleWorker mgmtWorker; - SHashObj *hash; - TdThreadRwlock lock; - SVnodesStat state; - STfs *pTfs; - TdThread thread; - bool stop; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQueryAutoQWorkerPool queryPool; + SAutoQWorkerPool streamPool; + SWWorkerPool fetchPool; + SSingleWorker mgmtWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7b7c51bbc7..45d1486912 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -82,7 +82,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code)); @@ -357,7 +357,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { (void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg); (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); + pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); @@ -383,7 +383,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; @@ -393,11 +393,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } int32_t vmStartWorker(SVnodeMgmt *pMgmt) { - SQWorkerPool *pQPool = &pMgmt->queryPool; + SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; pQPool->max = tsNumOfVnodeQueryThreads; - if (tQWorkerInit(pQPool) != 0) return -1; + if (tQueryAutoQWorkerInit(pQPool) != 0) return -1; SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; @@ -419,7 +419,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { } void vmStopWorker(SVnodeMgmt *pMgmt) { - tQWorkerCleanup(&pMgmt->queryPool); + tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fccba91db7..e35c152e9b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -106,7 +106,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2264e2779b..ba84951b7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -733,7 +733,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType); } -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) { vTrace("message in vnode query queue is processing"); if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && @@ -747,7 +747,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->poolCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c48c359fad..95414ff70e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -42,6 +42,7 @@ extern "C" { // #include "tstream.h" // #include "tstreamUpdate.h" #include "tlrucache.h" +#include "tworker.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 18f51df2e9..48de49f07c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -95,6 +95,7 @@ struct SExecTaskInfo { int8_t dynamicTask; SOperatorParam* pOpParam; bool paramSet; + SQueryAutoQWorkerPoolCB* pWorkerCb; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c527224438..1888d3dabe 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -60,6 +60,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInf bool holdDataInBuf); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo); + static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo) { int32_t code = 0; @@ -74,9 +76,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); - tsem_wait(&pExchangeInfo->ready); + code = exchangeWait(pOperator, pExchangeInfo); - if (isTaskKilled(pTaskInfo)) { + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -756,8 +758,8 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); - tsem_wait(&pExchangeInfo->ready); - if (isTaskKilled(pTaskInfo)) { + code = exchangeWait(pOperator, pExchangeInfo); + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -971,3 +973,17 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa return PROJECT_RETRIEVE_CONTINUE; } } + +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) { + SExecTaskInfo* pTask = pOperator->pTaskInfo; + if (pTask->pWorkerCb) { + pTask->code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); + if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + } + tsem_wait(&pExchangeInfo->ready); + if (pTask->pWorkerCb) { + pTask->code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); + if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + } + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 8b87f3da43..0bf4ac2b21 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -96,6 +96,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand TSWAP((*pTaskInfo)->sql, sql); (*pTaskInfo)->pSubplan = pPlan; + (*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb; (*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user, pPlan->dbFName); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d30b100147..a9b568dd91 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -254,6 +254,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->queryId = qId; req->taskId = tId; req->execId = eId; + //taosMsleep(500); SRpcMsg pNewMsg = { .msgType = TDMT_SCH_QUERY_CONTINUE, diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 07186331ae..4e921af76c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -480,3 +480,418 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { tWWorkerCleanup(&pWorker->pool); tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); } + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); +static void tQueryAutoQWorkerStartRunning(void *p); +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); +static int32_t tQueryAutoQWorkerWaitingCheck(void *p); +static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); +static void tQueryAutoQWorkerFinish(void *p); + +static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { + SQueryAutoQWorkerPool *pool = worker->pool; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + worker->pid = taosGetSelfPthreadId(); + uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + + while (1) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, + worker->pid); + break; + } + + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD); + } + } + + code = tQueryAutoQWorkerWaitingCheck(pool); + // TODO wjm what if taosd is exiting + + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + qinfo.poolCb = pool->pCb; + (*((FItem)qinfo.fp))(&qinfo, msg); + } + + taosUpdateItemSize(qinfo.queue, 1); + if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { + break; + } + } + + destroyThreadLocalGeosCtx(); + DestoryThreadLocalRegComp(); + + return NULL; +} + +static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + int32_t waiting = pPool->waitingAfterBlockN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadCondSignal(&pPool->waitingAfterBlockCond); + ret = true; + break; + } + waiting = waitingNew; + } + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + return ret; +} + +static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + int32_t waiting = pPool->waitingBeforeProcessMsgN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); + ret = true; + break; + } + waiting = waitingNew; + } + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + return ret; +} + +static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + int32_t active = pPool->activeN; + while (active > minActive) { + int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); + if (activeNew == active) { + atomic_fetch_sub_32(&pPool->runningN, 1); + ret = true; + break; + } + active = activeNew; + } + return ret; +} + +static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { + SQueryAutoQWorkerPool* pPool = p; + int32_t running = pPool->runningN; + while (running < pPool->num) { + int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); + if (runningNew == running) { + // to running + return TSDB_CODE_SUCCESS; + } + running = runningNew; + } + // to wait for process + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); + atomic_fetch_sub_32(&pPool->activeN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); + // recovered from waiting + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + if (pPool->exit) return -1; // TODO wjm error code + return TSDB_CODE_SUCCESS; +} + +bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker) { + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) || + tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { + taosThreadMutexLock(&pPool->poolLock); + SListNode* pNode = listNode(pWorker); + tdListPopNode(pPool->workers, pNode); + // reclaim some workers + if (pWorker->id >= pPool->num * 2 ) { + while (listNEles(pPool->exitedWorkers) > 0) { + SListNode* head = tdListPopHead(pPool->exitedWorkers); + SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; + if (pWorker && taosCheckPthreadValid(pWorker->thread)) { + taosThreadJoin(pWorker->thread, NULL); + taosThreadClear(&pWorker->thread); + } + taosMemoryFree(head); + } + tdListAppendNode(pPool->exitedWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + + // put back to backup pool + tdListAppendNode(pPool->backupWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + // start to wait at backup cond + taosThreadMutexLock(&pPool->backupLock); + atomic_fetch_add_32(&pPool->backupNum, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock); + // TODO wjm what if taosd is exiting + taosThreadMutexUnlock(&pPool->backupLock); + + // recovered from backup + taosThreadMutexLock(&pPool->poolLock); + if (pPool->exit) { + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + tdListPopNode(pPool->backupWorkers, pNode); + tdListAppendNode(pPool->workers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + return true; + } else { + atomic_fetch_sub_32(&pPool->runningN, 1); + return true; + } +} + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { + pool->qset = taosOpenQset(); + if (!pool->qset) return terrno; + pool->workers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->workers) return TSDB_CODE_OUT_OF_MEMORY; + pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; + + (void)taosThreadMutexInit(&pool->poolLock, NULL); + (void)taosThreadMutexInit(&pool->backupLock, NULL); + (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); + (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + + (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); + (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); + (void)taosThreadCondInit(&pool->backupCond, NULL); + + if (!pool->pCb) { + pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB)); + if (!pool->pCb) return TSDB_CODE_OUT_OF_MEMORY; + pool->pCb->pPool = pool; + pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking; + pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking; + } + return TSDB_CODE_SUCCESS; +} + +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { + taosThreadMutexLock(&pPool->poolLock); + pPool->exit = true; + int32_t size = listNEles(pPool->workers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + size = listNEles(pPool->backupWorkers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + taosThreadMutexUnlock(&pPool->poolLock); + + taosThreadMutexLock(&pPool->backupLock); + taosThreadCondBroadcast(&pPool->backupCond); + taosThreadMutexUnlock(&pPool->backupLock); + + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + taosThreadCondBroadcast(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + + int32_t idx = 0; + SQueryAutoQWorker* worker = NULL; + while (true) { + taosThreadMutexLock(&pPool->poolLock); + if (listNEles(pPool->workers) == 0) { + taosThreadMutexUnlock(&pPool->poolLock); + break; + } + SListNode* pNode = tdListPopHead(pPool->workers); + worker = (SQueryAutoQWorker*)pNode->data; + taosThreadMutexUnlock(&pPool->poolLock); + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->backupWorkers) > 0) { + // TODO wjm need lock? + SListNode* pNode = tdListPopHead(pPool->backupWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->exitedWorkers) > 0) { + SListNode* pNode = tdListPopHead(pPool->exitedWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + tdListFree(pPool->workers); + tdListFree(pPool->backupWorkers); + tdListFree(pPool->exitedWorkers); + taosMemoryFree(pPool->pCb); + + taosThreadMutexDestroy(&pPool->poolLock); + taosThreadMutexDestroy(&pPool->backupLock); + taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); + taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); + + taosThreadCondDestroy(&pPool->backupCond); + taosThreadCondDestroy(&pPool->waitingAfterBlockCond); + taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond); + taosCloseQset(pPool->qset); +} + +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) { + STaosQueue *queue = taosOpenQueue(); + if (queue == NULL) return NULL; + + taosThreadMutexLock(&pool->poolLock); + taosSetQueueFp(queue, fp, NULL); + taosAddIntoQset(pool->qset, queue, ahandle); + SQueryAutoQWorker worker = {0}; + SQueryAutoQWorker* pWorker = NULL; + + // spawn a thread to process queue + if (pool->num < pool->max) { + do { + worker.id = listNEles(pool->workers); + worker.backupIdx = -1; + worker.pool = pool; + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosCloseQueue(queue); + queue = NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + queue = NULL; + break; + } + + taosThreadAttrDestroy(&thAttr); + pool->num++; + pool->activeN++; + uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); + } while (pool->num < pool->min); + } + + taosThreadMutexUnlock(&pool->poolLock); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + + return queue; +} + +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { + taosCloseQueue(pQ); +} + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { + // try backup pool + int32_t backup = pool->backupNum; + while (backup > 0) { + int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1); + if (backupNew == backup) { + taosThreadCondSignal(&pool->backupCond); + return TSDB_CODE_SUCCESS; + } + backup = backupNew; + } + // backup pool is empty, create new + SQueryAutoQWorker* pWorker = NULL; + SQueryAutoQWorker worker = {0}; + worker.pool = pool; + worker.backupIdx = -1; + taosThreadMutexLock(&pool->poolLock); + worker.id = listNEles(pool->workers); + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosThreadMutexUnlock(&pool->poolLock); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadMutexUnlock(&pool->poolLock); + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadAttrDestroy(&thAttr); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) || + tQueryAutoQWorkerTryDecActive(p, 1)) { + } else { + int32_t code = tQueryAutoQWorkerAddWorker(pPool); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + atomic_fetch_sub_32(&pPool->runningN, 1); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + int32_t running = pPool->runningN; + while (running < pPool->num) { + int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); + if (runningNew == running) { + atomic_fetch_add_32(&pPool->activeN, 1); + return TSDB_CODE_SUCCESS; + } + running = runningNew; + } + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + if (pPool->exit) return -1; // TODO wjm error code + return TSDB_CODE_SUCCESS; +} From 938dc06a0926db914d5f345280a1de191256bb74 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 Date: Wed, 26 Jun 2024 13:47:51 +0800 Subject: [PATCH 02/15] recycle threads slower --- source/util/src/tworker.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 4e921af76c..482db7b5b8 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -618,7 +618,7 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ tdListPopNode(pPool->workers, pNode); // reclaim some workers if (pWorker->id >= pPool->num * 2 ) { - while (listNEles(pPool->exitedWorkers) > 0) { + while (listNEles(pPool->exitedWorkers) > pPool->num) { SListNode* head = tdListPopHead(pPool->exitedWorkers); SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; if (pWorker && taosCheckPthreadValid(pWorker->thread)) { @@ -640,7 +640,6 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ taosThreadMutexLock(&pPool->backupLock); atomic_fetch_add_32(&pPool->backupNum, 1); if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock); - // TODO wjm what if taosd is exiting taosThreadMutexUnlock(&pPool->backupLock); // recovered from backup @@ -742,7 +741,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } taosMemoryFree(pNode); } - + while (listNEles(pPool->exitedWorkers) > 0) { SListNode* pNode = tdListPopHead(pPool->exitedWorkers); worker = (SQueryAutoQWorker*)pNode->data; From 03a7b43af653e371b33f1171d5e10701975dc0f5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 15:56:23 +0800 Subject: [PATCH 03/15] client use new thread pool --- source/libs/qcom/src/queryUtil.c | 42 ++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4206c9292f..ab536e9e62 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -19,10 +19,13 @@ #include "tmsg.h" #include "trpc.h" #include "tsched.h" +#include "tworker.h" // clang-format off #include "cJSON.h" #include "queryInt.h" +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg); + int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { case OP_TYPE_GREATER_THAN: @@ -118,12 +121,21 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static SSchedQueue pTaskQueue = {0}; +static STaosQueue* pTaskQueue = NULL; +static SQueryAutoQWorkerPool taskThreadPool = {0}; int32_t initTaskQueue() { - int32_t queueSize = tsMaxShellConns * 2; - void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); - if (NULL == p) { + taskThreadPool.name = "tsc"; + taskThreadPool.min = tsNumOfTaskQueueThreads; + taskThreadPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool); + if (TSDB_CODE_SUCCESS != coce) { + qError("failed to init task thread pool"); + return -1; + } + + pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue); + if (NULL == pTaskQueue) { qError("failed to init task queue"); return -1; } @@ -133,26 +145,24 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - taosCleanUpScheduler(&pTaskQueue); + tQueryAutoQWorkerCleanup(&taskThreadPool); return 0; } -static void execHelper(struct SSchedMsg* pSchedMsg) { +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); - if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*)pSchedMsg->msg = code; - } + execFn(pSchedMsg->thandle); + taosFreeQitem(pSchedMsg); } int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { - SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; - schedMsg.ahandle = execFn; - schedMsg.thandle = execParam; - schedMsg.msg = code; + SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = execFn; + pSchedMsg->thandle = execParam; + pSchedMsg->msg = code; - return taosScheduleTask(&pTaskQueue, &schedMsg); + return taosWriteQitem(pTaskQueue, pSchedMsg); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { From 0383257900746673494bcfe931fdf45441df15e7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 17:03:40 +0800 Subject: [PATCH 04/15] limit the size of query queue --- source/libs/qcom/src/queryUtil.c | 51 +++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index ab536e9e62..e454ea3c01 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -24,7 +24,11 @@ #include "cJSON.h" #include "queryInt.h" -static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg); +typedef struct SQueryQueue { + SQueryAutoQWorkerPool taskThreadPool; + STaosQueue* pTaskQueue; + tsem_t queueSem; +} SQueryQueue; int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { @@ -121,48 +125,61 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static STaosQueue* pTaskQueue = NULL; -static SQueryAutoQWorkerPool taskThreadPool = {0}; +static SQueryQueue clientQueryQueue = {0}; + +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + execFn(pSchedMsg->thandle); + taosFreeQitem(pSchedMsg); + if (tsem_post(&clientQueryQueue.queueSem) != 0) { + uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + } +} int32_t initTaskQueue() { - taskThreadPool.name = "tsc"; - taskThreadPool.min = tsNumOfTaskQueueThreads; - taskThreadPool.max = tsNumOfTaskQueueThreads; - int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool); + uint32_t queueSize = (uint32_t)(tsMaxShellConns * 2); + clientQueryQueue.taskThreadPool.name = "tsc"; + clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; + clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&clientQueryQueue.taskThreadPool); if (TSDB_CODE_SUCCESS != coce) { qError("failed to init task thread pool"); return -1; } - pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue); - if (NULL == pTaskQueue) { + clientQueryQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientQueryQueue.taskThreadPool, NULL, (FItem)processTaskQueue); + if (NULL == clientQueryQueue.pTaskQueue) { qError("failed to init task queue"); return -1; } + + if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) { + uError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + cleanupTaskQueue(); + return -1; + } qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); return 0; } int32_t cleanupTaskQueue() { - tQueryAutoQWorkerCleanup(&taskThreadPool); + tsem_destroy(&clientQueryQueue.queueSem); + tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); return 0; } -static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { - __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - execFn(pSchedMsg->thandle); - taosFreeQitem(pSchedMsg); -} - int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); pSchedMsg->fp = NULL; pSchedMsg->ahandle = execFn; pSchedMsg->thandle = execParam; pSchedMsg->msg = code; + if (tsem_wait(&clientQueryQueue.queueSem) != 0) { + qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + } - return taosWriteQitem(pTaskQueue, pSchedMsg); + return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { From 8fdd64e92ebc9227976f672be70d3fe78fee1bd7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 17:27:35 +0800 Subject: [PATCH 05/15] limit the size of query queue --- include/libs/qcom/query.h | 2 ++ source/libs/qcom/src/queryUtil.c | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ef702f24d7..2078455f1d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -297,6 +297,8 @@ int32_t cleanupTaskQueue(); * @return */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); +int32_t taosAsyncWait(); +int32_t taosAsyncRecover(); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index e454ea3c01..98e1034ad7 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -132,7 +132,7 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { execFn(pSchedMsg->thandle); taosFreeQitem(pSchedMsg); if (tsem_post(&clientQueryQueue.queueSem) != 0) { - uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + qError("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); } } @@ -176,12 +176,28 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) pSchedMsg->thandle = execParam; pSchedMsg->msg = code; if (tsem_wait(&clientQueryQueue.queueSem) != 0) { - qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + qError("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); } return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); } +int32_t taosAsyncWait() { + if (!clientQueryQueue.taskThreadPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return clientQueryQueue.taskThreadPool.pCb->beforeBlocking(&clientQueryQueue.taskThreadPool); +} + +int32_t taosAsyncRecover() { + if (!clientQueryQueue.taskThreadPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return clientQueryQueue.taskThreadPool.pCb->afterRecoverFromBlocking(&clientQueryQueue.taskThreadPool); +} + void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { if (NULL == pMsgBody) { return; From 02a7d2984123d43211db177c375a2d18a4cae632 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 17:48:37 +0800 Subject: [PATCH 06/15] adj log --- source/libs/qcom/src/queryUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 98e1034ad7..609f5cf6c4 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -154,7 +154,7 @@ int32_t initTaskQueue() { } if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) { - uError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + qError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); cleanupTaskQueue(); return -1; } From a25571313f134f726f64cf8f886d43fbc477621a Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 26 Jun 2024 16:46:37 +0800 Subject: [PATCH 07/15] remove test code --- include/util/tworker.h | 10 ++++------ source/libs/qworker/src/qwMsg.c | 1 - source/util/src/tworker.c | 22 +++++++++------------- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 3af6d3d8ba..6df335fc74 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -130,13 +130,11 @@ typedef struct SQueryAutoQWorkerPool { int32_t num; int32_t max; int32_t min; - int32_t maxRunning; + int32_t maxInUse; int32_t activeN; // running workers and workers waiting at reading new queue msg int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. - int32_t blockingN; // blocked worker num, like exchangeoperator sem_wait - int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; TdThreadCond waitingAfterBlockCond; @@ -165,9 +163,9 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahan void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); typedef struct SQueryAutoQWorkerPoolCB { - SQueryAutoQWorkerPool* pPool; - int32_t (*beforeBlocking)(void* pPool); - int32_t (*afterRecoverFromBlocking)(void* pPool); + void *pPool; + int32_t (*beforeBlocking)(void *pPool); + int32_t (*afterRecoverFromBlocking)(void *pPool); } SQueryAutoQWorkerPoolCB; #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index a9b568dd91..d30b100147 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -254,7 +254,6 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->queryId = qId; req->taskId = tId; req->execId = eId; - //taosMsleep(500); SRpcMsg pNewMsg = { .msgType = TDMT_SCH_QUERY_CONTINUE, diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 482db7b5b8..2ca0450bed 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -482,12 +482,10 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { } static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); -static void tQueryAutoQWorkerStartRunning(void *p); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); -static int32_t tQueryAutoQWorkerWaitingCheck(void *p); +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); -static void tQueryAutoQWorkerFinish(void *p); static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { SQueryAutoQWorkerPool *pool = worker->pool; @@ -498,7 +496,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); - uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { @@ -514,8 +512,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - code = tQueryAutoQWorkerWaitingCheck(pool); - // TODO wjm what if taosd is exiting + tQueryAutoQWorkerWaitingCheck(pool); if (qinfo.fp != NULL) { qinfo.workerId = worker->id; @@ -526,6 +523,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { + uDebug("worker:%s:%d exited", pool->name, worker->id); break; } } @@ -588,8 +586,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { return ret; } -static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { - SQueryAutoQWorkerPool* pPool = p; +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { int32_t running = pPool->runningN; while (running < pPool->num) { int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); @@ -606,7 +603,6 @@ static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); // recovered from waiting taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); - if (pPool->exit) return -1; // TODO wjm error code return TSDB_CODE_SUCCESS; } @@ -617,8 +613,8 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ SListNode* pNode = listNode(pWorker); tdListPopNode(pPool->workers, pNode); // reclaim some workers - if (pWorker->id >= pPool->num * 2 ) { - while (listNEles(pPool->exitedWorkers) > pPool->num) { + if (pWorker->id >= pPool->maxInUse) { + while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { SListNode* head = tdListPopHead(pPool->exitedWorkers); SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; if (pWorker && taosCheckPthreadValid(pWorker->thread)) { @@ -668,6 +664,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->maxInUse = pool->max * 2 + 2; (void)taosThreadMutexInit(&pool->poolLock, NULL); (void)taosThreadMutexInit(&pool->backupLock, NULL); @@ -732,7 +729,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } while (listNEles(pPool->backupWorkers) > 0) { - // TODO wjm need lock? SListNode* pNode = tdListPopHead(pPool->backupWorkers); worker = (SQueryAutoQWorker*)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { @@ -891,6 +887,6 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); - if (pPool->exit) return -1; // TODO wjm error code + if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT; return TSDB_CODE_SUCCESS; } From 55341586b506eb35804a3050748e065c16d2f1b3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 27 Jun 2024 10:59:02 +0800 Subject: [PATCH 08/15] drop limit --- source/libs/qcom/src/queryUtil.c | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 609f5cf6c4..dfb3cafd86 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -27,7 +27,6 @@ typedef struct SQueryQueue { SQueryAutoQWorkerPool taskThreadPool; STaosQueue* pTaskQueue; - tsem_t queueSem; } SQueryQueue; int32_t getAsofJoinReverseOp(EOperatorType op) { @@ -131,13 +130,9 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; execFn(pSchedMsg->thandle); taosFreeQitem(pSchedMsg); - if (tsem_post(&clientQueryQueue.queueSem) != 0) { - qError("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); - } } int32_t initTaskQueue() { - uint32_t queueSize = (uint32_t)(tsMaxShellConns * 2); clientQueryQueue.taskThreadPool.name = "tsc"; clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; @@ -152,19 +147,12 @@ int32_t initTaskQueue() { qError("failed to init task queue"); return -1; } - - if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) { - qError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); - cleanupTaskQueue(); - return -1; - } qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); return 0; } int32_t cleanupTaskQueue() { - tsem_destroy(&clientQueryQueue.queueSem); tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); return 0; } @@ -175,9 +163,6 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) pSchedMsg->ahandle = execFn; pSchedMsg->thandle = execParam; pSchedMsg->msg = code; - if (tsem_wait(&clientQueryQueue.queueSem) != 0) { - qError("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); - } return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); } From dc0224cb2ca215e7d0f7fe4365c7e22c6b434f72 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 27 Jun 2024 09:37:28 +0800 Subject: [PATCH 09/15] fix windows compile with void* --- include/util/tlist.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/util/tlist.h b/include/util/tlist.h index 1d4de3096f..866a37fee4 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -218,7 +218,7 @@ typedef struct { #define listEleSize(l) ((l)->eleSize) #define isListEmpty(l) (TD_DLIST_NELES(l) == 0) #define listNodeFree(n) taosMemoryFree(n) -#define listNode(data) (SListNode*)(((void*)(data)) - sizeof(SListNode)) +#define listNode(data) (SListNode*)(((char*)(data)) - sizeof(SListNode)) void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); From 26adfdd48d6f0640960db3722412e3eca49f7d24 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 1 Jul 2024 09:49:12 +0800 Subject: [PATCH 10/15] adj name --- source/libs/qcom/src/queryUtil.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index dfb3cafd86..4ba801307b 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -25,7 +25,7 @@ #include "queryInt.h" typedef struct SQueryQueue { - SQueryAutoQWorkerPool taskThreadPool; + SQueryAutoQWorkerPool wrokrerPool; STaosQueue* pTaskQueue; } SQueryQueue; @@ -124,7 +124,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static SQueryQueue clientQueryQueue = {0}; +static SQueryQueue clientTaskQueue = {0}; static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; @@ -133,17 +133,17 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { } int32_t initTaskQueue() { - clientQueryQueue.taskThreadPool.name = "tsc"; - clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; - clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; - int32_t coce = tQueryAutoQWorkerInit(&clientQueryQueue.taskThreadPool); + clientTaskQueue.wrokrerPool.name = "tsc"; + clientTaskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; + clientTaskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&clientTaskQueue.wrokrerPool); if (TSDB_CODE_SUCCESS != coce) { qError("failed to init task thread pool"); return -1; } - clientQueryQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientQueryQueue.taskThreadPool, NULL, (FItem)processTaskQueue); - if (NULL == clientQueryQueue.pTaskQueue) { + clientTaskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientTaskQueue.wrokrerPool, NULL, (FItem)processTaskQueue); + if (NULL == clientTaskQueue.pTaskQueue) { qError("failed to init task queue"); return -1; } @@ -153,7 +153,7 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); + tQueryAutoQWorkerCleanup(&clientTaskQueue.wrokrerPool); return 0; } @@ -164,23 +164,23 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) pSchedMsg->thandle = execParam; pSchedMsg->msg = code; - return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); + return taosWriteQitem(clientTaskQueue.pTaskQueue, pSchedMsg); } int32_t taosAsyncWait() { - if (!clientQueryQueue.taskThreadPool.pCb) { + if (!clientTaskQueue.wrokrerPool.pCb) { qError("query task thread pool callback function is null"); return -1; } - return clientQueryQueue.taskThreadPool.pCb->beforeBlocking(&clientQueryQueue.taskThreadPool); + return clientTaskQueue.wrokrerPool.pCb->beforeBlocking(&clientTaskQueue.wrokrerPool); } int32_t taosAsyncRecover() { - if (!clientQueryQueue.taskThreadPool.pCb) { + if (!clientTaskQueue.wrokrerPool.pCb) { qError("query task thread pool callback function is null"); return -1; } - return clientQueryQueue.taskThreadPool.pCb->afterRecoverFromBlocking(&clientQueryQueue.taskThreadPool); + return clientTaskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&clientTaskQueue.wrokrerPool); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { From 5040ff5bd9594dbce910b846bac536817d650df5 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 28 Jun 2024 15:11:44 +0800 Subject: [PATCH 11/15] mnode/qnode support QueryAutoQWorkerPool --- include/dnode/mnode/mnode.h | 2 +- include/dnode/qnode/qnode.h | 2 +- include/util/tqueue.h | 2 +- include/util/tworker.h | 25 ++++--- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 3 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 3 +- source/dnode/mnode/impl/inc/mndInt.h | 3 + source/dnode/mnode/impl/src/mndMain.c | 27 ++++++-- source/dnode/mnode/impl/src/mndQuery.c | 20 +++--- source/dnode/qnode/inc/qndInt.h | 2 +- source/dnode/qnode/src/qnode.c | 7 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/audit/src/auditMain.c | 1 - source/libs/executor/src/exchangeoperator.c | 15 ++-- source/libs/executor/src/executor.c | 1 + source/util/src/tworker.c | 77 ++++++++++++++++----- 16 files changed, 137 insertions(+), 55 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index fe96fe1117..b821231539 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -109,7 +109,7 @@ int64_t mndGetRoleTimeMs(SMnode *pMnode); * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -int32_t mndProcessRpcMsg(SRpcMsg *pMsg); +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg); diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 7d342c4ba1..e7f9d00ff3 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -60,7 +60,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pQnode The qnode object. * @param pMsg The request message */ -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg); +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 51a3b2dc7d..bed218ac1b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -49,7 +49,7 @@ typedef struct { int32_t workerId; int32_t threadNum; int64_t timestamp; - void *poolCb; + void *workerCb; } SQueueInfo; typedef enum { diff --git a/include/util/tworker.h b/include/util/tworker.h index 6df335fc74..5e5271d324 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -84,18 +84,25 @@ void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); +typedef enum SQWorkerPoolType { + QWORKER_POOL = 0, + QUERY_AUTO_QWORKER_POOL, +} SQWorkerPoolType; + typedef struct { - const char *name; - int32_t min; - int32_t max; - FItem fp; - void *param; + const char *name; + int32_t min; + int32_t max; + FItem fp; + void *param; + SQWorkerPoolType poolType; } SSingleWorkerCfg; typedef struct { - const char *name; - STaosQueue *queue; - SQWorkerPool pool; + const char *name; + STaosQueue *queue; + SQWorkerPoolType poolType; // default to QWORKER_POOL + void *pool; } SSingleWorker; typedef struct { @@ -134,6 +141,8 @@ typedef struct SQueryAutoQWorkerPool { int32_t activeN; // running workers and workers waiting at reading new queue msg int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. + TdThreadMutex activeLock; + int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 885086e37a..e5c32f9a43 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - int32_t code = mndProcessRpcMsg(pMsg); + int32_t code = mndProcessRpcMsg(pMsg, pInfo); if (pInfo->timestamp != 0) { int64_t cost = taosGetTimestampUs() - pInfo->timestamp; @@ -203,6 +203,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .name = "mnode-query", .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 28da0f9c5f..5c635ff5ea 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -30,7 +30,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode queue", pMsg); - int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); + int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo, pMsg); if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; qmSendRsp(pMsg, code); @@ -105,6 +105,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .name = "qnode-query", .fp = (FItem)qmProcessQueue, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2da14c65d2..072568eb0f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -54,6 +54,7 @@ extern "C" { #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg); +typedef int32_t (*MndMsgFpExt)(SRpcMsg *pMsg, SQueueInfo* pInfo); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -137,11 +138,13 @@ typedef struct SMnode { SEncryptMgmt encryptMgmt; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; + MndMsgFpExt msgFpExt[TDMT_MAX]; SMsgCb msgCb; int64_t ipWhiteVer; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp); int64_t mndGenerateUid(const char *name, int32_t len); void mndSetRestored(SMnode *pMnode, bool restored); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..398ea5d589 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -844,21 +844,29 @@ _OVER: return -1; } -int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; + int32_t code = TSDB_CODE_SUCCESS; - MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFpExt fpExt = NULL; if (fp == NULL) { - mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; + fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)]; + if (fpExt == NULL) { + mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } } if (mndCheckMnodeState(pMsg) != 0) return -1; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - int32_t code = (*fp)(pMsg); + if (fp) + code = (*fp)(pMsg); + else + code = (*fpExt)(pMsg, pQueueInfo); mndReleaseRpc(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { @@ -883,6 +891,13 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp) { + tmsg_t type = TMSG_INDEX(msgType); + if (type < TDMT_MAX) { + pMnode->msgFpExt[type] = fp; + } +} + // Note: uid 0 is reserved int64_t mndGenerateUid(const char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index c03b02c17f..ae930f0d96 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -30,11 +30,11 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } -int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { +int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; - SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; + SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb, .pWorkerCb = pInfo->workerCb}; mTrace("msg:%p, in query queue is processing", pMsg); switch (pMsg->msgType) { @@ -173,14 +173,14 @@ int32_t mndInitQuery(SMnode *pMnode) { return -1; } - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg); return 0; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 86deda52ad..e8ccb75040 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQueueWorker SQHandle; +typedef struct SQWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 9937debb13..8cd967a8a8 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tqueue.h" #include "executor.h" #include "qndInt.h" #include "query.h" @@ -24,6 +25,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { qError("calloc SQnode failed"); return NULL; } + pQnode->qndId = QNODE_HANDLE; if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); @@ -72,9 +74,10 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false); } -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg) { int32_t code = -1; - SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; + int64_t ts = pInfo->timestamp; + SReadHandle handle = {.pMsgCb = &pQnode->msgCb, .pWorkerCb = pInfo->workerCb}; qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ba84951b7b..2d05cc2e00 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -747,7 +747,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) { return 0; } - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->poolCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index 96934888eb..aa3b669c1b 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -22,7 +22,6 @@ #include "ttime.h" #include "tjson.h" #include "tglobal.h" -#include "mnode.h" #include "audit.h" #include "osMemory.h" diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 1888d3dabe..059e1f2663 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -976,14 +976,21 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) { SExecTaskInfo* pTask = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; if (pTask->pWorkerCb) { - pTask->code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); - if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } } tsem_wait(&pExchangeInfo->ready); if (pTask->pWorkerCb) { - pTask->code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); - if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 77a80d229e..ee4b0bbae9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -794,6 +794,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); if (pExchangeInfo) { + qDebug("%s stop exchange operator", GET_TASKID(pTaskInfo)); tsem_post(&pExchangeInfo->ready); taosReleaseRef(exchangeObjRefPool, pStop->refId); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 2ca0450bed..3bd4039a9c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -433,28 +433,66 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { } int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = pCfg->name; - pPool->min = pCfg->min; - pPool->max = pCfg->max; - if (tQWorkerInit(pPool) != 0) return -1; - - pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); - if (pWorker->queue == NULL) return -1; - + pWorker->poolType = pCfg->poolType; pWorker->name = pCfg->name; + + switch (pCfg->poolType) { + case QWORKER_POOL: { + SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) return -1; + } break; + case QUERY_AUTO_QWORKER_POOL: { + SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQueryAutoQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (!pWorker->queue) return -1; + } break; + default: + assert(0); + } return 0; } void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; - while (!taosQueueEmpty(pWorker->queue)) { taosMsleep(10); } - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); + switch (pWorker->poolType) { + case QWORKER_POOL: + tQWorkerCleanup(pWorker->pool); + tQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + case QUERY_AUTO_QWORKER_POOL: + tQueryAutoQWorkerCleanup(pWorker->pool); + tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + default: + assert(0); + } } int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { @@ -517,7 +555,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { if (qinfo.fp != NULL) { qinfo.workerId = worker->id; qinfo.threadNum = pool->num; - qinfo.poolCb = pool->pCb; + qinfo.workerCb = pool->pCb; (*((FItem)qinfo.fp))(&qinfo, msg); } @@ -573,33 +611,38 @@ static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; + taosThreadMutexLock(&pPool->activeLock); int32_t active = pPool->activeN; while (active > minActive) { int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); if (activeNew == active) { - atomic_fetch_sub_32(&pPool->runningN, 1); ret = true; break; } active = activeNew; } + atomic_fetch_sub_32(&pPool->runningN, 1); + taosThreadMutexUnlock(&pPool->activeLock); return ret; } static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { + taosThreadMutexLock(&pPool->activeLock); int32_t running = pPool->runningN; while (running < pPool->num) { int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); if (runningNew == running) { // to running + taosThreadMutexUnlock(&pPool->activeLock); return TSDB_CODE_SUCCESS; } running = runningNew; } + atomic_fetch_sub_32(&pPool->activeN, 1); + taosThreadMutexUnlock(&pPool->activeLock); // to wait for process taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); - atomic_fetch_sub_32(&pPool->activeN, 1); if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); // recovered from waiting taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); @@ -650,7 +693,6 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ return true; } else { - atomic_fetch_sub_32(&pPool->runningN, 1); return true; } } @@ -670,6 +712,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { (void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + (void)taosThreadMutexInit(&pool->activeLock, NULL); (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); @@ -757,6 +800,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosThreadMutexDestroy(&pPool->backupLock); taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); + taosThreadMutexDestroy(&pPool->activeLock); taosThreadCondDestroy(&pPool->backupCond); taosThreadCondDestroy(&pPool->waitingAfterBlockCond); @@ -866,7 +910,6 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { if (code != TSDB_CODE_SUCCESS) { return code; } - atomic_fetch_sub_32(&pPool->runningN, 1); } return TSDB_CODE_SUCCESS; From 6c6f322a834c21e313a2f4594979add27fe76561 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 28 Jun 2024 16:41:00 +0800 Subject: [PATCH 12/15] change vnode/qnode query threads min value --- source/common/src/tglobal.c | 4 ++-- source/libs/executor/src/executor.c | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 996cc3c6f8..3372c7b1cc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -669,12 +669,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ee4b0bbae9..77a80d229e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -794,7 +794,6 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); if (pExchangeInfo) { - qDebug("%s stop exchange operator", GET_TASKID(pTaskInfo)); tsem_post(&pExchangeInfo->ready); taosReleaseRef(exchangeObjRefPool, pStop->refId); } From c3deebf5b687fb85a2351732b8a630138d05452c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 1 Jul 2024 10:12:13 +0800 Subject: [PATCH 13/15] adj name --- source/libs/qcom/src/queryUtil.c | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4ba801307b..0265731b3e 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -24,10 +24,10 @@ #include "cJSON.h" #include "queryInt.h" -typedef struct SQueryQueue { +typedef struct STaskQueue { SQueryAutoQWorkerPool wrokrerPool; STaosQueue* pTaskQueue; -} SQueryQueue; +} STaskQueue; int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { @@ -124,7 +124,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static SQueryQueue clientTaskQueue = {0}; +static STaskQueue taskQueue = {0}; static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; @@ -133,17 +133,17 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { } int32_t initTaskQueue() { - clientTaskQueue.wrokrerPool.name = "tsc"; - clientTaskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; - clientTaskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; - int32_t coce = tQueryAutoQWorkerInit(&clientTaskQueue.wrokrerPool); + taskQueue.wrokrerPool.name = "tsc"; + taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; + taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool); if (TSDB_CODE_SUCCESS != coce) { qError("failed to init task thread pool"); return -1; } - clientTaskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientTaskQueue.wrokrerPool, NULL, (FItem)processTaskQueue); - if (NULL == clientTaskQueue.pTaskQueue) { + taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue); + if (NULL == taskQueue.pTaskQueue) { qError("failed to init task queue"); return -1; } @@ -153,7 +153,7 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - tQueryAutoQWorkerCleanup(&clientTaskQueue.wrokrerPool); + tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool); return 0; } @@ -164,23 +164,23 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) pSchedMsg->thandle = execParam; pSchedMsg->msg = code; - return taosWriteQitem(clientTaskQueue.pTaskQueue, pSchedMsg); + return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg); } int32_t taosAsyncWait() { - if (!clientTaskQueue.wrokrerPool.pCb) { + if (!taskQueue.wrokrerPool.pCb) { qError("query task thread pool callback function is null"); return -1; } - return clientTaskQueue.wrokrerPool.pCb->beforeBlocking(&clientTaskQueue.wrokrerPool); + return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool); } int32_t taosAsyncRecover() { - if (!clientTaskQueue.wrokrerPool.pCb) { + if (!taskQueue.wrokrerPool.pCb) { qError("query task thread pool callback function is null"); return -1; } - return clientTaskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&clientTaskQueue.wrokrerPool); + return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { From d0bb671d7947c98836cbc294eaf0dd521eff54a5 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 1 Jul 2024 15:41:19 +0800 Subject: [PATCH 14/15] avoid locks in queryAutoQWorkerPool --- include/util/tworker.h | 7 +-- source/util/src/tworker.c | 118 +++++++++++++++++++++++++++----------- 2 files changed, 89 insertions(+), 36 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 5e5271d324..a3ba7dba6d 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -139,10 +139,9 @@ typedef struct SQueryAutoQWorkerPool { int32_t min; int32_t maxInUse; - int32_t activeN; // running workers and workers waiting at reading new queue msg - int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. - TdThreadMutex activeLock; - + int64_t activeRunningN; // 4 bytes for activeN, 4 bytes for runningN + // activeN are running workers and workers waiting at reading new queue msgs + // runningN are workers processing queue msgs, not include blocking/waitingAfterBlock/waitingBeforeProcessMsg workers. int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 3bd4039a9c..bc5c46351a 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -525,6 +525,75 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); +#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) +#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF) + +static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) { + int64_t acutalSubVal = val; + acutalSubVal <<= 32; + int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); } + +static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) { + int64_t actualAddVal = val; + actualAddVal <<= 32; + int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); } + +static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int32_t running = GET_RUNNING_N(*ptr); + oldVal64 <<= 32; + newVal64 <<= 32; + oldVal64 |= running; + newVal64 |= running; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_ACTIVE_N(actualNewVal64); + return false; + } +} + +static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int64_t activeShifted = GET_ACTIVE_N(*ptr); + activeShifted <<= 32; + oldVal64 |= activeShifted; + newVal64 |= activeShifted; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_RUNNING_N(actualNewVal64); + return false; + } +} + +static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, + int32_t *expectedRunning, int32_t newRunning) { + int64_t oldVal64 = *expectedActive, newVal64 = newActive; + oldVal64 <<= 32; + oldVal64 |= *expectedRunning; + newVal64 <<= 32; + newVal64 |= newRunning; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedActive = GET_ACTIVE_N(actualNewVal64); + *expectedRunning = GET_RUNNING_N(actualNewVal64); + return false; + } +} + static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { SQueryAutoQWorkerPool *pool = worker->pool; SQueueInfo qinfo = {0}; @@ -575,71 +644,60 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->waitingAfterBlockLock); int32_t waiting = pPool->waitingAfterBlockN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingAfterBlockLock); taosThreadCondSignal(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); ret = true; break; } waiting = waitingNew; } - taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); return ret; } static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); int32_t waiting = pPool->waitingBeforeProcessMsgN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); ret = true; break; } waiting = waitingNew; } - taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); return ret; } static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->activeLock); - int32_t active = pPool->activeN; + int64_t val64 = pPool->activeRunningN; + int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); while (active > minActive) { - int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); - if (activeNew == active) { - ret = true; - break; - } - active = activeNew; + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) + return true; } - atomic_fetch_sub_32(&pPool->runningN, 1); - taosThreadMutexUnlock(&pPool->activeLock); - return ret; + atomicFetchSubRunning(&pPool->activeRunningN, 1); + return false; } static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { - taosThreadMutexLock(&pPool->activeLock); - int32_t running = pPool->runningN; + int32_t running = GET_RUNNING_N(pPool->activeRunningN); while (running < pPool->num) { - int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); - if (runningNew == running) { - // to running - taosThreadMutexUnlock(&pPool->activeLock); + if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) { return TSDB_CODE_SUCCESS; } - running = runningNew; } - atomic_fetch_sub_32(&pPool->activeN, 1); - taosThreadMutexUnlock(&pPool->activeLock); + atomicFetchSubActive(&pPool->activeRunningN, 1); // to wait for process taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); @@ -712,7 +770,6 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { (void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); - (void)taosThreadMutexInit(&pool->activeLock, NULL); (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); @@ -800,7 +857,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosThreadMutexDestroy(&pPool->backupLock); taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); - taosThreadMutexDestroy(&pPool->activeLock); taosThreadCondDestroy(&pPool->backupCond); taosThreadCondDestroy(&pPool->waitingAfterBlockCond); @@ -846,7 +902,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand taosThreadAttrDestroy(&thAttr); pool->num++; - pool->activeN++; + atomicFetchAddActive(&pool->activeRunningN, 1); uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); } while (pool->num < pool->min); } @@ -917,14 +973,12 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { SQueryAutoQWorkerPool* pPool = p; - int32_t running = pPool->runningN; + int64_t val64 = pPool->activeRunningN; + int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); while (running < pPool->num) { - int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); - if (runningNew == running) { - atomic_fetch_add_32(&pPool->activeN, 1); + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { return TSDB_CODE_SUCCESS; } - running = runningNew; } taosThreadMutexLock(&pPool->waitingAfterBlockLock); atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); From 24308a4acbc65dad5aafbd6399a5e06de81c2206 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 1 Jul 2024 18:52:36 +0800 Subject: [PATCH 15/15] adj pool name --- source/libs/qcom/src/queryUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 0265731b3e..9ff6fc3e49 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -133,7 +133,7 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { } int32_t initTaskQueue() { - taskQueue.wrokrerPool.name = "tsc"; + taskQueue.wrokrerPool.name = "taskWorkPool"; taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);