diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 78a977c86f..a461ceef2a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -57,6 +57,7 @@ typedef struct { STimeWindow winRange; struct SStorageAPI api; + void* pWorkerCb; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/include/util/tlist.h b/include/util/tlist.h index 0924c133b9..1d4de3096f 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -218,6 +218,7 @@ typedef struct { #define listEleSize(l) ((l)->eleSize) #define isListEmpty(l) (TD_DLIST_NELES(l) == 0) #define listNodeFree(n) taosMemoryFree(n) +#define listNode(data) (SListNode*)(((void*)(data)) - sizeof(SListNode)) void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); @@ -293,4 +294,4 @@ SListNode *tdListNext(SListIter *pIter); } #endif -#endif /*_TD_UTIL_LIST_H_*/ \ No newline at end of file +#endif /*_TD_UTIL_LIST_H_*/ diff --git a/include/util/tqueue.h b/include/util/tqueue.h index eb887596d0..51a3b2dc7d 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -49,6 +49,7 @@ typedef struct { int32_t workerId; int32_t threadNum; int64_t timestamp; + void *poolCb; } SQueueInfo; typedef enum { diff --git a/include/util/tworker.h b/include/util/tworker.h index f39540d24b..3af6d3d8ba 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -16,6 +16,7 @@ #ifndef _TD_UTIL_WORKER_H_ #define _TD_UTIL_WORKER_H_ +#include "tlist.h" #include "tqueue.h" #include "tarray.h" @@ -115,6 +116,60 @@ void tSingleWorkerCleanup(SSingleWorker *pWorker); int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); void tMultiWorkerCleanup(SMultiWorker *pWorker); +struct SQueryAutoQWorkerPoolCB; + +typedef struct SQueryAutoQWorker { + int32_t id; // worker id + int32_t backupIdx;// the idx when put into backup pool + int64_t pid; // thread pid + TdThread thread; // thread id + void *pool; +} SQueryAutoQWorker; + +typedef struct SQueryAutoQWorkerPool { + int32_t num; + int32_t max; + int32_t min; + int32_t maxRunning; + + int32_t activeN; // running workers and workers waiting at reading new queue msg + int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. + + int32_t blockingN; // blocked worker num, like exchangeoperator sem_wait + + int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers + TdThreadMutex waitingAfterBlockLock; + TdThreadCond waitingAfterBlockCond; + + int32_t waitingBeforeProcessMsgN; // workers that get msg from queue, but waiting for too many running workers + TdThreadMutex waitingBeforeProcessMsgLock; + TdThreadCond waitingBeforeProcessMsgCond; + + int32_t backupNum; // workers that are in backup pool, not reading msg from queue + TdThreadMutex backupLock; + TdThreadCond backupCond; + + const char *name; + TdThreadMutex poolLock; + SList *workers; + SList *backupWorkers; + SList *exitedWorkers; + STaosQset *qset; + struct SQueryAutoQWorkerPoolCB *pCb; + bool exit; +} SQueryAutoQWorkerPool; + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pPool); +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool); +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahandle, FItem fp); +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); + +typedef struct SQueryAutoQWorkerPoolCB { + SQueryAutoQWorkerPool* pPool; + int32_t (*beforeBlocking)(void* pPool); + int32_t (*afterRecoverFromBlocking)(void* pPool); +} SQueryAutoQWorkerPoolCB; + #ifdef __cplusplus } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 07d78b5c0b..996cc3c6f8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -669,7 +669,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 375a34c04f..85712d2797 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,20 +26,20 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SAutoQWorkerPool streamPool; - SWWorkerPool fetchPool; - SSingleWorker mgmtWorker; - SHashObj *hash; - TdThreadRwlock lock; - SVnodesStat state; - STfs *pTfs; - TdThread thread; - bool stop; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQueryAutoQWorkerPool queryPool; + SAutoQWorkerPool streamPool; + SWWorkerPool fetchPool; + SSingleWorker mgmtWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7b7c51bbc7..45d1486912 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -82,7 +82,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code)); @@ -357,7 +357,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { (void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg); (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); + pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); @@ -383,7 +383,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; @@ -393,11 +393,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } int32_t vmStartWorker(SVnodeMgmt *pMgmt) { - SQWorkerPool *pQPool = &pMgmt->queryPool; + SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; pQPool->max = tsNumOfVnodeQueryThreads; - if (tQWorkerInit(pQPool) != 0) return -1; + if (tQueryAutoQWorkerInit(pQPool) != 0) return -1; SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; @@ -419,7 +419,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { } void vmStopWorker(SVnodeMgmt *pMgmt) { - tQWorkerCleanup(&pMgmt->queryPool); + tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fccba91db7..e35c152e9b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -106,7 +106,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2264e2779b..ba84951b7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -733,7 +733,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType); } -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) { vTrace("message in vnode query queue is processing"); if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && @@ -747,7 +747,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->poolCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c48c359fad..95414ff70e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -42,6 +42,7 @@ extern "C" { // #include "tstream.h" // #include "tstreamUpdate.h" #include "tlrucache.h" +#include "tworker.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 18f51df2e9..48de49f07c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -95,6 +95,7 @@ struct SExecTaskInfo { int8_t dynamicTask; SOperatorParam* pOpParam; bool paramSet; + SQueryAutoQWorkerPoolCB* pWorkerCb; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c527224438..1888d3dabe 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -60,6 +60,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInf bool holdDataInBuf); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo); + static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo) { int32_t code = 0; @@ -74,9 +76,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); - tsem_wait(&pExchangeInfo->ready); + code = exchangeWait(pOperator, pExchangeInfo); - if (isTaskKilled(pTaskInfo)) { + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -756,8 +758,8 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); - tsem_wait(&pExchangeInfo->ready); - if (isTaskKilled(pTaskInfo)) { + code = exchangeWait(pOperator, pExchangeInfo); + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -971,3 +973,17 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa return PROJECT_RETRIEVE_CONTINUE; } } + +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) { + SExecTaskInfo* pTask = pOperator->pTaskInfo; + if (pTask->pWorkerCb) { + pTask->code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); + if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + } + tsem_wait(&pExchangeInfo->ready); + if (pTask->pWorkerCb) { + pTask->code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); + if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + } + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 8b87f3da43..0bf4ac2b21 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -96,6 +96,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand TSWAP((*pTaskInfo)->sql, sql); (*pTaskInfo)->pSubplan = pPlan; + (*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb; (*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user, pPlan->dbFName); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d30b100147..a9b568dd91 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -254,6 +254,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->queryId = qId; req->taskId = tId; req->execId = eId; + //taosMsleep(500); SRpcMsg pNewMsg = { .msgType = TDMT_SCH_QUERY_CONTINUE, diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 07186331ae..4e921af76c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -480,3 +480,418 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { tWWorkerCleanup(&pWorker->pool); tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); } + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); +static void tQueryAutoQWorkerStartRunning(void *p); +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); +static int32_t tQueryAutoQWorkerWaitingCheck(void *p); +static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); +static void tQueryAutoQWorkerFinish(void *p); + +static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { + SQueryAutoQWorkerPool *pool = worker->pool; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + worker->pid = taosGetSelfPthreadId(); + uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + + while (1) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, + worker->pid); + break; + } + + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD); + } + } + + code = tQueryAutoQWorkerWaitingCheck(pool); + // TODO wjm what if taosd is exiting + + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + qinfo.poolCb = pool->pCb; + (*((FItem)qinfo.fp))(&qinfo, msg); + } + + taosUpdateItemSize(qinfo.queue, 1); + if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { + break; + } + } + + destroyThreadLocalGeosCtx(); + DestoryThreadLocalRegComp(); + + return NULL; +} + +static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + int32_t waiting = pPool->waitingAfterBlockN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadCondSignal(&pPool->waitingAfterBlockCond); + ret = true; + break; + } + waiting = waitingNew; + } + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + return ret; +} + +static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + int32_t waiting = pPool->waitingBeforeProcessMsgN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); + ret = true; + break; + } + waiting = waitingNew; + } + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + return ret; +} + +static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + int32_t active = pPool->activeN; + while (active > minActive) { + int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); + if (activeNew == active) { + atomic_fetch_sub_32(&pPool->runningN, 1); + ret = true; + break; + } + active = activeNew; + } + return ret; +} + +static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { + SQueryAutoQWorkerPool* pPool = p; + int32_t running = pPool->runningN; + while (running < pPool->num) { + int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); + if (runningNew == running) { + // to running + return TSDB_CODE_SUCCESS; + } + running = runningNew; + } + // to wait for process + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); + atomic_fetch_sub_32(&pPool->activeN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); + // recovered from waiting + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + if (pPool->exit) return -1; // TODO wjm error code + return TSDB_CODE_SUCCESS; +} + +bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker) { + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) || + tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { + taosThreadMutexLock(&pPool->poolLock); + SListNode* pNode = listNode(pWorker); + tdListPopNode(pPool->workers, pNode); + // reclaim some workers + if (pWorker->id >= pPool->num * 2 ) { + while (listNEles(pPool->exitedWorkers) > 0) { + SListNode* head = tdListPopHead(pPool->exitedWorkers); + SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; + if (pWorker && taosCheckPthreadValid(pWorker->thread)) { + taosThreadJoin(pWorker->thread, NULL); + taosThreadClear(&pWorker->thread); + } + taosMemoryFree(head); + } + tdListAppendNode(pPool->exitedWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + + // put back to backup pool + tdListAppendNode(pPool->backupWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + // start to wait at backup cond + taosThreadMutexLock(&pPool->backupLock); + atomic_fetch_add_32(&pPool->backupNum, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock); + // TODO wjm what if taosd is exiting + taosThreadMutexUnlock(&pPool->backupLock); + + // recovered from backup + taosThreadMutexLock(&pPool->poolLock); + if (pPool->exit) { + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + tdListPopNode(pPool->backupWorkers, pNode); + tdListAppendNode(pPool->workers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + return true; + } else { + atomic_fetch_sub_32(&pPool->runningN, 1); + return true; + } +} + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { + pool->qset = taosOpenQset(); + if (!pool->qset) return terrno; + pool->workers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->workers) return TSDB_CODE_OUT_OF_MEMORY; + pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; + + (void)taosThreadMutexInit(&pool->poolLock, NULL); + (void)taosThreadMutexInit(&pool->backupLock, NULL); + (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); + (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + + (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); + (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); + (void)taosThreadCondInit(&pool->backupCond, NULL); + + if (!pool->pCb) { + pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB)); + if (!pool->pCb) return TSDB_CODE_OUT_OF_MEMORY; + pool->pCb->pPool = pool; + pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking; + pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking; + } + return TSDB_CODE_SUCCESS; +} + +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { + taosThreadMutexLock(&pPool->poolLock); + pPool->exit = true; + int32_t size = listNEles(pPool->workers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + size = listNEles(pPool->backupWorkers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + taosThreadMutexUnlock(&pPool->poolLock); + + taosThreadMutexLock(&pPool->backupLock); + taosThreadCondBroadcast(&pPool->backupCond); + taosThreadMutexUnlock(&pPool->backupLock); + + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + taosThreadCondBroadcast(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + + int32_t idx = 0; + SQueryAutoQWorker* worker = NULL; + while (true) { + taosThreadMutexLock(&pPool->poolLock); + if (listNEles(pPool->workers) == 0) { + taosThreadMutexUnlock(&pPool->poolLock); + break; + } + SListNode* pNode = tdListPopHead(pPool->workers); + worker = (SQueryAutoQWorker*)pNode->data; + taosThreadMutexUnlock(&pPool->poolLock); + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->backupWorkers) > 0) { + // TODO wjm need lock? + SListNode* pNode = tdListPopHead(pPool->backupWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->exitedWorkers) > 0) { + SListNode* pNode = tdListPopHead(pPool->exitedWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + tdListFree(pPool->workers); + tdListFree(pPool->backupWorkers); + tdListFree(pPool->exitedWorkers); + taosMemoryFree(pPool->pCb); + + taosThreadMutexDestroy(&pPool->poolLock); + taosThreadMutexDestroy(&pPool->backupLock); + taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); + taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); + + taosThreadCondDestroy(&pPool->backupCond); + taosThreadCondDestroy(&pPool->waitingAfterBlockCond); + taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond); + taosCloseQset(pPool->qset); +} + +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) { + STaosQueue *queue = taosOpenQueue(); + if (queue == NULL) return NULL; + + taosThreadMutexLock(&pool->poolLock); + taosSetQueueFp(queue, fp, NULL); + taosAddIntoQset(pool->qset, queue, ahandle); + SQueryAutoQWorker worker = {0}; + SQueryAutoQWorker* pWorker = NULL; + + // spawn a thread to process queue + if (pool->num < pool->max) { + do { + worker.id = listNEles(pool->workers); + worker.backupIdx = -1; + worker.pool = pool; + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosCloseQueue(queue); + queue = NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + queue = NULL; + break; + } + + taosThreadAttrDestroy(&thAttr); + pool->num++; + pool->activeN++; + uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); + } while (pool->num < pool->min); + } + + taosThreadMutexUnlock(&pool->poolLock); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + + return queue; +} + +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { + taosCloseQueue(pQ); +} + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { + // try backup pool + int32_t backup = pool->backupNum; + while (backup > 0) { + int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1); + if (backupNew == backup) { + taosThreadCondSignal(&pool->backupCond); + return TSDB_CODE_SUCCESS; + } + backup = backupNew; + } + // backup pool is empty, create new + SQueryAutoQWorker* pWorker = NULL; + SQueryAutoQWorker worker = {0}; + worker.pool = pool; + worker.backupIdx = -1; + taosThreadMutexLock(&pool->poolLock); + worker.id = listNEles(pool->workers); + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosThreadMutexUnlock(&pool->poolLock); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadMutexUnlock(&pool->poolLock); + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadAttrDestroy(&thAttr); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) || + tQueryAutoQWorkerTryDecActive(p, 1)) { + } else { + int32_t code = tQueryAutoQWorkerAddWorker(pPool); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + atomic_fetch_sub_32(&pPool->runningN, 1); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + int32_t running = pPool->runningN; + while (running < pPool->num) { + int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); + if (runningNew == running) { + atomic_fetch_add_32(&pPool->activeN, 1); + return TSDB_CODE_SUCCESS; + } + running = runningNew; + } + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + if (pPool->exit) return -1; // TODO wjm error code + return TSDB_CODE_SUCCESS; +}