diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index fe96fe1117..b821231539 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -109,7 +109,7 @@ int64_t mndGetRoleTimeMs(SMnode *pMnode); * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -int32_t mndProcessRpcMsg(SRpcMsg *pMsg); +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg); diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 7d342c4ba1..e7f9d00ff3 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -60,7 +60,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pQnode The qnode object. * @param pMsg The request message */ -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg); +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 78a977c86f..a461ceef2a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -57,6 +57,7 @@ typedef struct { STimeWindow winRange; struct SStorageAPI api; + void* pWorkerCb; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ef702f24d7..2078455f1d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -297,6 +297,8 @@ int32_t cleanupTaskQueue(); * @return */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); +int32_t taosAsyncWait(); +int32_t taosAsyncRecover(); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/include/util/tlist.h b/include/util/tlist.h index 0924c133b9..866a37fee4 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -218,6 +218,7 @@ typedef struct { #define listEleSize(l) ((l)->eleSize) #define isListEmpty(l) (TD_DLIST_NELES(l) == 0) #define listNodeFree(n) taosMemoryFree(n) +#define listNode(data) (SListNode*)(((char*)(data)) - sizeof(SListNode)) void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); @@ -293,4 +294,4 @@ SListNode *tdListNext(SListIter *pIter); } #endif -#endif /*_TD_UTIL_LIST_H_*/ \ No newline at end of file +#endif /*_TD_UTIL_LIST_H_*/ diff --git a/include/util/tqueue.h b/include/util/tqueue.h index eb887596d0..bed218ac1b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -49,6 +49,7 @@ typedef struct { int32_t workerId; int32_t threadNum; int64_t timestamp; + void *workerCb; } SQueueInfo; typedef enum { diff --git a/include/util/tworker.h b/include/util/tworker.h index f39540d24b..a3ba7dba6d 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -16,6 +16,7 @@ #ifndef _TD_UTIL_WORKER_H_ #define _TD_UTIL_WORKER_H_ +#include "tlist.h" #include "tqueue.h" #include "tarray.h" @@ -83,18 +84,25 @@ void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); +typedef enum SQWorkerPoolType { + QWORKER_POOL = 0, + QUERY_AUTO_QWORKER_POOL, +} SQWorkerPoolType; + typedef struct { - const char *name; - int32_t min; - int32_t max; - FItem fp; - void *param; + const char *name; + int32_t min; + int32_t max; + FItem fp; + void *param; + SQWorkerPoolType poolType; } SSingleWorkerCfg; typedef struct { - const char *name; - STaosQueue *queue; - SQWorkerPool pool; + const char *name; + STaosQueue *queue; + SQWorkerPoolType poolType; // default to QWORKER_POOL + void *pool; } SSingleWorker; typedef struct { @@ -115,6 +123,59 @@ void tSingleWorkerCleanup(SSingleWorker *pWorker); int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); void tMultiWorkerCleanup(SMultiWorker *pWorker); +struct SQueryAutoQWorkerPoolCB; + +typedef struct SQueryAutoQWorker { + int32_t id; // worker id + int32_t backupIdx;// the idx when put into backup pool + int64_t pid; // thread pid + TdThread thread; // thread id + void *pool; +} SQueryAutoQWorker; + +typedef struct SQueryAutoQWorkerPool { + int32_t num; + int32_t max; + int32_t min; + int32_t maxInUse; + + int64_t activeRunningN; // 4 bytes for activeN, 4 bytes for runningN + // activeN are running workers and workers waiting at reading new queue msgs + // runningN are workers processing queue msgs, not include blocking/waitingAfterBlock/waitingBeforeProcessMsg workers. + + int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers + TdThreadMutex waitingAfterBlockLock; + TdThreadCond waitingAfterBlockCond; + + int32_t waitingBeforeProcessMsgN; // workers that get msg from queue, but waiting for too many running workers + TdThreadMutex waitingBeforeProcessMsgLock; + TdThreadCond waitingBeforeProcessMsgCond; + + int32_t backupNum; // workers that are in backup pool, not reading msg from queue + TdThreadMutex backupLock; + TdThreadCond backupCond; + + const char *name; + TdThreadMutex poolLock; + SList *workers; + SList *backupWorkers; + SList *exitedWorkers; + STaosQset *qset; + struct SQueryAutoQWorkerPoolCB *pCb; + bool exit; +} SQueryAutoQWorkerPool; + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pPool); +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool); +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahandle, FItem fp); +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); + +typedef struct SQueryAutoQWorkerPoolCB { + void *pPool; + int32_t (*beforeBlocking)(void *pPool); + int32_t (*afterRecoverFromBlocking)(void *pPool); +} SQueryAutoQWorkerPoolCB; + #ifdef __cplusplus } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 07d78b5c0b..3372c7b1cc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -669,12 +669,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 885086e37a..e5c32f9a43 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - int32_t code = mndProcessRpcMsg(pMsg); + int32_t code = mndProcessRpcMsg(pMsg, pInfo); if (pInfo->timestamp != 0) { int64_t cost = taosGetTimestampUs() - pInfo->timestamp; @@ -203,6 +203,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .name = "mnode-query", .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 28da0f9c5f..5c635ff5ea 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -30,7 +30,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode queue", pMsg); - int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); + int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo, pMsg); if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; qmSendRsp(pMsg, code); @@ -105,6 +105,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .name = "qnode-query", .fp = (FItem)qmProcessQueue, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 375a34c04f..85712d2797 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,20 +26,20 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SAutoQWorkerPool streamPool; - SWWorkerPool fetchPool; - SSingleWorker mgmtWorker; - SHashObj *hash; - TdThreadRwlock lock; - SVnodesStat state; - STfs *pTfs; - TdThread thread; - bool stop; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQueryAutoQWorkerPool queryPool; + SAutoQWorkerPool streamPool; + SWWorkerPool fetchPool; + SSingleWorker mgmtWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; + TdThread thread; + bool stop; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7b7c51bbc7..45d1486912 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -82,7 +82,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code)); @@ -357,7 +357,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { (void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg); (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg); - pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); + pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); @@ -383,7 +383,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { - tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; @@ -393,11 +393,11 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { } int32_t vmStartWorker(SVnodeMgmt *pMgmt) { - SQWorkerPool *pQPool = &pMgmt->queryPool; + SQueryAutoQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; pQPool->min = tsNumOfVnodeQueryThreads; pQPool->max = tsNumOfVnodeQueryThreads; - if (tQWorkerInit(pQPool) != 0) return -1; + if (tQueryAutoQWorkerInit(pQPool) != 0) return -1; SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool; pStreamPool->name = "vnode-stream"; @@ -419,7 +419,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { } void vmStopWorker(SVnodeMgmt *pMgmt) { - tQWorkerCleanup(&pMgmt->queryPool); + tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2da14c65d2..072568eb0f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -54,6 +54,7 @@ extern "C" { #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg); +typedef int32_t (*MndMsgFpExt)(SRpcMsg *pMsg, SQueueInfo* pInfo); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -137,11 +138,13 @@ typedef struct SMnode { SEncryptMgmt encryptMgmt; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; + MndMsgFpExt msgFpExt[TDMT_MAX]; SMsgCb msgCb; int64_t ipWhiteVer; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp); int64_t mndGenerateUid(const char *name, int32_t len); void mndSetRestored(SMnode *pMnode, bool restored); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..398ea5d589 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -844,21 +844,29 @@ _OVER: return -1; } -int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; + int32_t code = TSDB_CODE_SUCCESS; - MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFpExt fpExt = NULL; if (fp == NULL) { - mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; + fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)]; + if (fpExt == NULL) { + mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } } if (mndCheckMnodeState(pMsg) != 0) return -1; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - int32_t code = (*fp)(pMsg); + if (fp) + code = (*fp)(pMsg); + else + code = (*fpExt)(pMsg, pQueueInfo); mndReleaseRpc(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { @@ -883,6 +891,13 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp) { + tmsg_t type = TMSG_INDEX(msgType); + if (type < TDMT_MAX) { + pMnode->msgFpExt[type] = fp; + } +} + // Note: uid 0 is reserved int64_t mndGenerateUid(const char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index c03b02c17f..ae930f0d96 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -30,11 +30,11 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } -int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { +int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; - SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; + SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb, .pWorkerCb = pInfo->workerCb}; mTrace("msg:%p, in query queue is processing", pMsg); switch (pMsg->msgType) { @@ -173,14 +173,14 @@ int32_t mndInitQuery(SMnode *pMnode) { return -1; } - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg); return 0; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 86deda52ad..e8ccb75040 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQueueWorker SQHandle; +typedef struct SQWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 9937debb13..8cd967a8a8 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tqueue.h" #include "executor.h" #include "qndInt.h" #include "query.h" @@ -24,6 +25,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { qError("calloc SQnode failed"); return NULL; } + pQnode->qndId = QNODE_HANDLE; if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); @@ -72,9 +74,10 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false); } -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg) { int32_t code = -1; - SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; + int64_t ts = pInfo->timestamp; + SReadHandle handle = {.pMsgCb = &pQnode->msgCb, .pWorkerCb = pInfo->workerCb}; qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fccba91db7..e35c152e9b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -106,7 +106,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2264e2779b..2d05cc2e00 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -733,7 +733,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType); } -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) { vTrace("message in vnode query queue is processing"); if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && @@ -747,7 +747,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index 96934888eb..aa3b669c1b 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -22,7 +22,6 @@ #include "ttime.h" #include "tjson.h" #include "tglobal.h" -#include "mnode.h" #include "audit.h" #include "osMemory.h" diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c48c359fad..95414ff70e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -42,6 +42,7 @@ extern "C" { // #include "tstream.h" // #include "tstreamUpdate.h" #include "tlrucache.h" +#include "tworker.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 18f51df2e9..48de49f07c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -95,6 +95,7 @@ struct SExecTaskInfo { int8_t dynamicTask; SOperatorParam* pOpParam; bool paramSet; + SQueryAutoQWorkerPoolCB* pWorkerCb; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c527224438..059e1f2663 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -60,6 +60,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInf bool holdDataInBuf); static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo); +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo); + static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo) { int32_t code = 0; @@ -74,9 +76,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); - tsem_wait(&pExchangeInfo->ready); + code = exchangeWait(pOperator, pExchangeInfo); - if (isTaskKilled(pTaskInfo)) { + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -756,8 +758,8 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); - tsem_wait(&pExchangeInfo->ready); - if (isTaskKilled(pTaskInfo)) { + code = exchangeWait(pOperator, pExchangeInfo); + if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -971,3 +973,24 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa return PROJECT_RETRIEVE_CONTINUE; } } + +static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) { + SExecTaskInfo* pTask = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + if (pTask->pWorkerCb) { + code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } + } + tsem_wait(&pExchangeInfo->ready); + if (pTask->pWorkerCb) { + code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } + } + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 8b87f3da43..0bf4ac2b21 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -96,6 +96,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand TSWAP((*pTaskInfo)->sql, sql); (*pTaskInfo)->pSubplan = pPlan; + (*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb; (*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user, pPlan->dbFName); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4206c9292f..9ff6fc3e49 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -19,10 +19,16 @@ #include "tmsg.h" #include "trpc.h" #include "tsched.h" +#include "tworker.h" // clang-format off #include "cJSON.h" #include "queryInt.h" +typedef struct STaskQueue { + SQueryAutoQWorkerPool wrokrerPool; + STaosQueue* pTaskQueue; +} STaskQueue; + int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { case OP_TYPE_GREATER_THAN: @@ -118,12 +124,26 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static SSchedQueue pTaskQueue = {0}; +static STaskQueue taskQueue = {0}; + +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + execFn(pSchedMsg->thandle); + taosFreeQitem(pSchedMsg); +} int32_t initTaskQueue() { - int32_t queueSize = tsMaxShellConns * 2; - void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); - if (NULL == p) { + taskQueue.wrokrerPool.name = "taskWorkPool"; + taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; + taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool); + if (TSDB_CODE_SUCCESS != coce) { + qError("failed to init task thread pool"); + return -1; + } + + taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue); + if (NULL == taskQueue.pTaskQueue) { qError("failed to init task queue"); return -1; } @@ -133,26 +153,34 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - taosCleanUpScheduler(&pTaskQueue); + tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool); return 0; } -static void execHelper(struct SSchedMsg* pSchedMsg) { - __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); - if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*)pSchedMsg->msg = code; - } +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { + SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = execFn; + pSchedMsg->thandle = execParam; + pSchedMsg->msg = code; + + return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg); } -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { - SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; - schedMsg.ahandle = execFn; - schedMsg.thandle = execParam; - schedMsg.msg = code; +int32_t taosAsyncWait() { + if (!taskQueue.wrokrerPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool); +} - return taosScheduleTask(&pTaskQueue, &schedMsg); +int32_t taosAsyncRecover() { + if (!taskQueue.wrokrerPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 07186331ae..bc5c46351a 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -433,28 +433,66 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { } int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = pCfg->name; - pPool->min = pCfg->min; - pPool->max = pCfg->max; - if (tQWorkerInit(pPool) != 0) return -1; - - pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); - if (pWorker->queue == NULL) return -1; - + pWorker->poolType = pCfg->poolType; pWorker->name = pCfg->name; + + switch (pCfg->poolType) { + case QWORKER_POOL: { + SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) return -1; + } break; + case QUERY_AUTO_QWORKER_POOL: { + SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQueryAutoQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (!pWorker->queue) return -1; + } break; + default: + assert(0); + } return 0; } void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; - while (!taosQueueEmpty(pWorker->queue)) { taosMsleep(10); } - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); + switch (pWorker->poolType) { + case QWORKER_POOL: + tQWorkerCleanup(pWorker->pool); + tQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + case QUERY_AUTO_QWORKER_POOL: + tQueryAutoQWorkerCleanup(pWorker->pool); + tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + default: + assert(0); + } } int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { @@ -480,3 +518,472 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { tWWorkerCleanup(&pWorker->pool); tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); } + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); +static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); + +#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) +#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF) + +static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) { + int64_t acutalSubVal = val; + acutalSubVal <<= 32; + int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); } + +static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) { + int64_t actualAddVal = val; + actualAddVal <<= 32; + int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); } + +static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int32_t running = GET_RUNNING_N(*ptr); + oldVal64 <<= 32; + newVal64 <<= 32; + oldVal64 |= running; + newVal64 |= running; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_ACTIVE_N(actualNewVal64); + return false; + } +} + +static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int64_t activeShifted = GET_ACTIVE_N(*ptr); + activeShifted <<= 32; + oldVal64 |= activeShifted; + newVal64 |= activeShifted; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_RUNNING_N(actualNewVal64); + return false; + } +} + +static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, + int32_t *expectedRunning, int32_t newRunning) { + int64_t oldVal64 = *expectedActive, newVal64 = newActive; + oldVal64 <<= 32; + oldVal64 |= *expectedRunning; + newVal64 <<= 32; + newVal64 |= newRunning; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedActive = GET_ACTIVE_N(actualNewVal64); + *expectedRunning = GET_RUNNING_N(actualNewVal64); + return false; + } +} + +static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { + SQueryAutoQWorkerPool *pool = worker->pool; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + worker->pid = taosGetSelfPthreadId(); + uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + + while (1) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset, + worker->pid); + break; + } + + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD); + } + } + + tQueryAutoQWorkerWaitingCheck(pool); + + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + qinfo.workerCb = pool->pCb; + (*((FItem)qinfo.fp))(&qinfo, msg); + } + + taosUpdateItemSize(qinfo.queue, 1); + if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { + uDebug("worker:%s:%d exited", pool->name, worker->id); + break; + } + } + + destroyThreadLocalGeosCtx(); + DestoryThreadLocalRegComp(); + + return NULL; +} + +static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + int32_t waiting = pPool->waitingAfterBlockN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + taosThreadCondSignal(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + ret = true; + break; + } + waiting = waitingNew; + } + return ret; +} + +static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + int32_t waiting = pPool->waitingBeforeProcessMsgN; + while (waiting > 0) { + int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); + if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + ret = true; + break; + } + waiting = waitingNew; + } + return ret; +} + +static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { + SQueryAutoQWorkerPool *pPool = p; + bool ret = false; + int64_t val64 = pPool->activeRunningN; + int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); + while (active > minActive) { + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) + return true; + } + atomicFetchSubRunning(&pPool->activeRunningN, 1); + return false; +} + +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { + int32_t running = GET_RUNNING_N(pPool->activeRunningN); + while (running < pPool->num) { + if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) { + return TSDB_CODE_SUCCESS; + } + } + atomicFetchSubActive(&pPool->activeRunningN, 1); + // to wait for process + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); + // recovered from waiting + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + return TSDB_CODE_SUCCESS; +} + +bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker) { + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) || + tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { + taosThreadMutexLock(&pPool->poolLock); + SListNode* pNode = listNode(pWorker); + tdListPopNode(pPool->workers, pNode); + // reclaim some workers + if (pWorker->id >= pPool->maxInUse) { + while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { + SListNode* head = tdListPopHead(pPool->exitedWorkers); + SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; + if (pWorker && taosCheckPthreadValid(pWorker->thread)) { + taosThreadJoin(pWorker->thread, NULL); + taosThreadClear(&pWorker->thread); + } + taosMemoryFree(head); + } + tdListAppendNode(pPool->exitedWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + + // put back to backup pool + tdListAppendNode(pPool->backupWorkers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + // start to wait at backup cond + taosThreadMutexLock(&pPool->backupLock); + atomic_fetch_add_32(&pPool->backupNum, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock); + taosThreadMutexUnlock(&pPool->backupLock); + + // recovered from backup + taosThreadMutexLock(&pPool->poolLock); + if (pPool->exit) { + taosThreadMutexUnlock(&pPool->poolLock); + return false; + } + tdListPopNode(pPool->backupWorkers, pNode); + tdListAppendNode(pPool->workers, pNode); + taosThreadMutexUnlock(&pPool->poolLock); + + return true; + } else { + return true; + } +} + +int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { + pool->qset = taosOpenQset(); + if (!pool->qset) return terrno; + pool->workers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->workers) return TSDB_CODE_OUT_OF_MEMORY; + pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); + if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->maxInUse = pool->max * 2 + 2; + + (void)taosThreadMutexInit(&pool->poolLock, NULL); + (void)taosThreadMutexInit(&pool->backupLock, NULL); + (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); + (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + + (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); + (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); + (void)taosThreadCondInit(&pool->backupCond, NULL); + + if (!pool->pCb) { + pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB)); + if (!pool->pCb) return TSDB_CODE_OUT_OF_MEMORY; + pool->pCb->pPool = pool; + pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking; + pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking; + } + return TSDB_CODE_SUCCESS; +} + +void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { + taosThreadMutexLock(&pPool->poolLock); + pPool->exit = true; + int32_t size = listNEles(pPool->workers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + size = listNEles(pPool->backupWorkers); + for (int32_t i = 0; i < size; ++i) { + taosQsetThreadResume(pPool->qset); + } + taosThreadMutexUnlock(&pPool->poolLock); + + taosThreadMutexLock(&pPool->backupLock); + taosThreadCondBroadcast(&pPool->backupCond); + taosThreadMutexUnlock(&pPool->backupLock); + + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + taosThreadCondBroadcast(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); + taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); + + int32_t idx = 0; + SQueryAutoQWorker* worker = NULL; + while (true) { + taosThreadMutexLock(&pPool->poolLock); + if (listNEles(pPool->workers) == 0) { + taosThreadMutexUnlock(&pPool->poolLock); + break; + } + SListNode* pNode = tdListPopHead(pPool->workers); + worker = (SQueryAutoQWorker*)pNode->data; + taosThreadMutexUnlock(&pPool->poolLock); + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->backupWorkers) > 0) { + SListNode* pNode = tdListPopHead(pPool->backupWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + while (listNEles(pPool->exitedWorkers) > 0) { + SListNode* pNode = tdListPopHead(pPool->exitedWorkers); + worker = (SQueryAutoQWorker*)pNode->data; + if (worker && taosCheckPthreadValid(worker->thread)) { + taosThreadJoin(worker->thread, NULL); + taosThreadClear(&worker->thread); + } + taosMemoryFree(pNode); + } + + tdListFree(pPool->workers); + tdListFree(pPool->backupWorkers); + tdListFree(pPool->exitedWorkers); + taosMemoryFree(pPool->pCb); + + taosThreadMutexDestroy(&pPool->poolLock); + taosThreadMutexDestroy(&pPool->backupLock); + taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); + taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); + + taosThreadCondDestroy(&pPool->backupCond); + taosThreadCondDestroy(&pPool->waitingAfterBlockCond); + taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond); + taosCloseQset(pPool->qset); +} + +STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) { + STaosQueue *queue = taosOpenQueue(); + if (queue == NULL) return NULL; + + taosThreadMutexLock(&pool->poolLock); + taosSetQueueFp(queue, fp, NULL); + taosAddIntoQset(pool->qset, queue, ahandle); + SQueryAutoQWorker worker = {0}; + SQueryAutoQWorker* pWorker = NULL; + + // spawn a thread to process queue + if (pool->num < pool->max) { + do { + worker.id = listNEles(pool->workers); + worker.backupIdx = -1; + worker.pool = pool; + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosCloseQueue(queue); + queue = NULL; + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + taosCloseQueue(queue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + queue = NULL; + break; + } + + taosThreadAttrDestroy(&thAttr); + pool->num++; + atomicFetchAddActive(&pool->activeRunningN, 1); + uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); + } while (pool->num < pool->min); + } + + taosThreadMutexUnlock(&pool->poolLock); + uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + + return queue; +} + +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { + taosCloseQueue(pQ); +} + +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { + // try backup pool + int32_t backup = pool->backupNum; + while (backup > 0) { + int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1); + if (backupNew == backup) { + taosThreadCondSignal(&pool->backupCond); + return TSDB_CODE_SUCCESS; + } + backup = backupNew; + } + // backup pool is empty, create new + SQueryAutoQWorker* pWorker = NULL; + SQueryAutoQWorker worker = {0}; + worker.pool = pool; + worker.backupIdx = -1; + taosThreadMutexLock(&pool->poolLock); + worker.id = listNEles(pool->workers); + SListNode* pNode = tdListAdd(pool->workers, &worker); + if (!pNode) { + taosThreadMutexUnlock(&pool->poolLock); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadMutexUnlock(&pool->poolLock); + pWorker = (SQueryAutoQWorker*)pNode->data; + + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + taosThreadAttrDestroy(&thAttr); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) || + tQueryAutoQWorkerTryDecActive(p, 1)) { + } else { + int32_t code = tQueryAutoQWorkerAddWorker(pPool); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { + SQueryAutoQWorkerPool* pPool = p; + int64_t val64 = pPool->activeRunningN; + int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); + while (running < pPool->num) { + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { + return TSDB_CODE_SUCCESS; + } + } + taosThreadMutexLock(&pPool->waitingAfterBlockLock); + atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); + if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); + if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT; + return TSDB_CODE_SUCCESS; +}