diff --git a/include/util/tworker.h b/include/util/tworker.h index 92d474c885..3545aeed89 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -70,8 +70,8 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); typedef struct { const char *name; - int32_t minNum; - int32_t maxNum; + int32_t min; + int32_t max; FItem fp; void *param; } SSingleWorkerCfg; @@ -84,7 +84,7 @@ typedef struct { typedef struct { const char *name; - int32_t maxNum; + int32_t max; FItems fp; void *param; } SMultiWorkerCfg; diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 7698aa9dbd..1ed9b0d931 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -71,7 +71,7 @@ int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index a7b8ca288b..cc1d8e5f24 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -108,21 +108,22 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { } static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { - dndCleanupServer(pDnode); + // dndCleanupServer(pDnode); for (ENodeType n = 0; n < NODE_MAX; ++n) { if (except == n) continue; SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dndCloseNode(pWrapper); + pWrapper->required = false; } } static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { - dTrace("msg:%p, get from child queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; + dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, + pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pWrapper, pMsg); + int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg); if (code != 0) { if (pRpc->msgType & 1U) { @@ -136,11 +137,13 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t } } -static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) { - dTrace("msg:%p, get from parent queue", pRsp); - pRsp->pCont = pCont; - dndSendRsp(pWrapper, pRsp); - taosMemoryFree(pRsp); +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) { + pRpc->pCont = pCont; + dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), pRpc->handle, + pRpc->ahandle); + + dndSendRsp(pWrapper, pRpc); + taosMemoryFree(pRpc); } static int32_t dndRunInMultiProcess(SDnode *pDnode) { diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index d34a26436c..8e560503d0 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -101,14 +101,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) { SSingleWorkerCfg mgmtCfg = { - .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } SSingleWorkerCfg statusCfg = { - .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 27489b45d0..d07313410a 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -123,27 +123,27 @@ int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); } - int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - SSingleWorkerCfg queryCfg = {.minNum = 0, .maxNum = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt}; - - if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { + SSingleWorkerCfg qCfg = {.min = 0, .max = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); return -1; } - if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { + SSingleWorkerCfg rCfg = {.min = 0, .max = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { + SSingleWorkerCfg wCfg = {.min = 0, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) { + SSingleWorkerCfg sCfg = {.min = 0, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index aa4da82790..5923f5c2f4 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -110,8 +110,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads, - .maxNum = maxQueryThreads, + SSingleWorkerCfg queryCfg = {.min = minQueryThreads, + .max = maxQueryThreads, .name = "qnode-query", .fp = (FItem)qmProcessQueryQueue, .param = pMgmt}; @@ -121,8 +121,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads, - .maxNum = maxFetchThreads, + SSingleWorkerCfg fetchCfg = {.min = minFetchThreads, + .max = maxFetchThreads, .name = "qnode-fetch", .fp = (FItem)qmProcessFetchQueue, .param = pMgmt}; diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 5913713ff3..4e46ad5818 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -57,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); @@ -69,8 +69,8 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, - .maxNum = SND_SHARED_THREAD_NUM, + SSingleWorkerCfg cfg = {.min = SND_SHARED_THREAD_NUM, + .max = SND_SHARED_THREAD_NUM, .name = "snode-shared", .fp = (FItem)smProcessSharedQueue, .param = pMgmt}; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 7b6d78a60c..99274ac99b 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -394,7 +394,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { if (tWWorkerInit(pWPool) != 0) return -1; SSingleWorkerCfg cfg = { - .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index f5ce88179b..d06aab6fda 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -264,7 +264,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); + uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody); return 0; } @@ -277,7 +278,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); terrno = TSDB_CODE_OUT_OF_SHM_MEM; - return -1; + return 0; } int32_t headLen = 0; @@ -341,8 +342,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea *pHeadLen = headLen; *pBodyLen = bodyLen; - uTrace("proc:%s, get msg:%p:%d cont:%p:%d from queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); - return 0; + uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody); + return 1; } SProcObj *taosProcInit(const SProcCfg *pCfg) { @@ -396,15 +398,15 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { void *pHead, *pBody; int32_t headLen, bodyLen; - uDebug("proc:%s, start to get message from queue:%p", pQueue->name, pQueue); + uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); while (1) { - int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); - if (code < 0) { - uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue); + int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); + if (numOfMsgs == 0) { + uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); break; - } else if (code == 0) { - uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr()); + } else if (numOfMsgs < 0) { + uTrace("proc:%s, get no msg from queue:%p since %s", pQueue->name, pQueue, terrstr()); taosMsleep(1); continue; } else { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 992ec74b5b..7fc390e70b 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -287,8 +287,8 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { SQWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; - pPool->min = pCfg->minNum; - pPool->max = pCfg->maxNum; + pPool->min = pCfg->min; + pPool->max = pCfg->max; if (tQWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -316,7 +316,7 @@ void tSingleWorkerCleanup(SSingleWorker *pWorker) { int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { SWWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; - pPool->max = pCfg->maxNum; + pPool->max = pCfg->max; if (tWWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1;