From 943de8bc15dc000b851f7d99b995b63e6ac1b446 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 11 Jul 2022 17:52:30 +0800 Subject: [PATCH 1/7] enh: adjust vnode fetch queue number --- source/common/src/tglobal.c | 3 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 42 +++++++++++---------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f19d17d034..7947624451 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1); + tsNumOfVnodeFetchThreads = tsNumOfCores / 4; + tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 6f00767eb0..6fc0ab4e5d 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -31,7 +31,7 @@ typedef struct SVnodeMgmt { const char *path; const char *name; SQWorkerPool queryPool; - SQWorkerPool fetchPool; + SWWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; SWWorkerPool applyPool; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1d795c74f2..e5b268a6a2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SVnodeObj *pVnode = pInfo->ahandle; - const STraceId *trace = &pMsg->info.traceId; +static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; - dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); - if (code != 0) { - if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); - vmSendRsp(pMsg, code); + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + const STraceId *trace = &pMsg->info.traceId; + dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); + + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + if (terrno != 0) code = terrno; + dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } - - dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); } static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { @@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) { @@ -259,7 +264,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pWriteQ = NULL; pVnode->pSyncQ = NULL; pVnode->pApplyQ = NULL; @@ -275,11 +280,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pQPool->max = tsNumOfVnodeQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; - SQWorkerPool *pFPool = &pMgmt->fetchPool; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; - pFPool->min = tsNumOfVnodeFetchThreads; pFPool->max = tsNumOfVnodeFetchThreads; - if (tQWorkerInit(pFPool) != 0) return -1; + if (tWWorkerInit(pFPool) != 0) return -1; SWWorkerPool *pWPool = &pMgmt->writePool; pWPool->name = "vnode-write"; @@ -325,6 +329,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->syncPool); tQWorkerCleanup(&pMgmt->queryPool); - tQWorkerCleanup(&pMgmt->fetchPool); + tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } From c1498e4a853fe290ee06222df2679310c4acf0f7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 10:31:25 +0800 Subject: [PATCH 2/7] fix: invalid vnode ref while drop stream --- source/common/src/tglobal.c | 4 ++++ source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 3 +++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 10 +++++++--- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7947624451..34d37981ef 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,8 +412,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; +#if 0 tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); +#else + tsNumOfVnodeFetchThreads = 1; +#endif if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 90f852eed1..cbcb541200 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { SVnodeObj *pVnode = *ppVnode; if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - // dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + // dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount); pVnodes[num++] = (*ppVnode); pIter = taosHashIterate(pMgmt->hash, pIter); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 051e5defb0..037086ac02 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL || pVnode->dropped) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + pVnode = NULL; } else { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount); @@ -80,6 +81,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); + + dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId); while (pVnode->refCount > 0) taosMsleep(10); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e5b268a6a2..93f93b1ab7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -206,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); if (code != 0) { dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); rpcFreeCont(pMsg->pCont); pRpc->pCont = NULL; + taosFreeQitem(pMsg); } return code; @@ -237,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { default: break; } + vmReleaseVnode(pMgmt, pVnode); } - vmReleaseVnode(pMgmt, pVnode); return size; } @@ -255,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { return -1; } - dDebug("vgId:%d, queue is alloced", pVnode->vgId); + dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ); + dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ); + dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ); + dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); + dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ); return 0; } From 52b63db905451bb3bd5862ec67b764a1fddf268b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 13:26:55 +0800 Subject: [PATCH 3/7] fix: invalid vnode ref while drop stream --- include/util/tqueue.h | 6 ++++-- source/libs/sync/src/syncIO.c | 10 +++++----- source/util/src/tqueue.c | 20 +++++++++++-------- source/util/src/tworker.c | 37 ++++++++++++++++------------------- 4 files changed, 38 insertions(+), 35 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 466c577c00..2886190997 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; typedef struct { void *ahandle; + void *fp; + void *queue; int32_t workerId; int32_t threadNum; int64_t timestamp; @@ -81,8 +83,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); int32_t taosGetQueueNumber(STaosQset *qset); -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp); -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); void taosResetQsetThread(STaosQset *qset, void *pItem); extern int64_t tsRpcQueueMemoryAllowed; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 663745a7d7..72d74d7ae5 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; - STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - qall = taosAllocateQall(); + SSyncIO *io = param; + STaosQall *qall = taosAllocateQall(); + SRpcMsg *pRpcMsg, rpcMsg; + SQueueInfo qinfo = {0}; while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, &qinfo); sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); if (numOfMsgs <= 0) { break; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 94311bc435..1895472472 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) { bool empty = false; taosThreadMutexLock(&queue->mutex); - if (queue->head == NULL && queue->tail == NULL) { + if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { empty = true; } taosThreadMutexUnlock(&queue->mutex); @@ -397,7 +397,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) { +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) { STaosQnode *pNode = NULL; int32_t code = 0; @@ -417,9 +417,10 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void if (queue->head) { pNode = queue->head; *ppItem = pNode->item; - if (ahandle) *ahandle = queue->ahandle; - if (itemFp) *itemFp = queue->itemFp; - if (ts) *ts = pNode->timestamp; + qinfo->ahandle = queue->ahandle; + qinfo->fp = queue->itemFp; + qinfo->queue = queue; + qinfo->timestamp = pNode->timestamp; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; @@ -440,7 +441,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void return code; } -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) { +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) { STaosQueue *queue; int32_t code = 0; @@ -461,13 +462,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand qall->start = queue->head; qall->numOfItems = queue->numOfItems; code = qall->numOfItems; - if (ahandle) *ahandle = queue->ahandle; - if (itemsFp) *itemsFp = queue->itemsFp; + qinfo->ahandle = queue->ahandle; + qinfo->fp = queue->itemsFp; + qinfo->queue = queue; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int32_t j = 1; j < qall->numOfItems; ++j) { tsem_wait(&qset->sem); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 88bd36f0cb..5e3a0dc109 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -70,26 +70,24 @@ void tQWorkerCleanup(SQWorkerPool *pool) { static void *tQWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FItem fp = NULL; - - void *msg = NULL; - void *ahandle = NULL; - int32_t code = 0; - int64_t ts = 0; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; taosBlockSIGPIPE(); setThreadName(pool->name); uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } - if (fp != NULL) { - SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts}; - (*fp)(&info, msg); + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + (*((FItem)qinfo.fp))(&qinfo, msg); } } @@ -195,27 +193,26 @@ void tWWorkerCleanup(SWWorkerPool *pool) { static void *tWWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; - FItems fp = NULL; - - void *msg = NULL; - void *ahandle = NULL; - int32_t numOfMsgs = 0; - int32_t qtype = 0; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + int32_t numOfMsgs = 0; taosBlockSIGPIPE(); setThreadName(pool->name); uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); + numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo); if (numOfMsgs == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } - if (fp != NULL) { - SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; - (*fp)(&info, worker->qall, numOfMsgs); + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs); } } From b9165a95039bf3ab8fbb490d5c61ba23fd112024 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 13:56:54 +0800 Subject: [PATCH 4/7] fix: reset queueSize after the queueItem is consumed and executed by the worker --- include/util/tqueue.h | 1 + source/libs/sync/src/syncIO.c | 2 ++ source/libs/transport/test/pushServer.c | 5 ++++- source/util/src/tqueue.c | 17 +++++++++++++---- source/util/src/tworker.c | 5 ++++- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 2886190997..0f4f1db9ee 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -67,6 +67,7 @@ void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); +void taosUpdateItemSize(STaosQueue *queue, int32_t items); int32_t taosQueueItemSize(STaosQueue *queue); int64_t taosQueueMemorySize(STaosQueue *queue); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 72d74d7ae5..d9f11ba80f 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) { taosFreeQitem(pRpcMsg); } + + taosUpdateItemSize(qinfo.queue, numOfMsgs); } taosFreeQall(qall); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index 6a4ff213d0..25972c9ec1 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -32,11 +32,12 @@ void processShellMsg() { SRpcMsg * pRpcMsg, rpcMsg; int type; void * pvnode; + SQueueInfo qinfo = {0}; qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL); + int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, &qinfo); tDebug("%d shell msgs are received", numOfMsgs); if (numOfMsgs <= 0) break; @@ -86,6 +87,8 @@ void processShellMsg() { rpcSendResponse(&nRpcMsg); } } + + taosUpdateItemSize(qinfo.queue, numOfMsgs); } taosFreeQall(qall); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 1895472472..50beba8a9b 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) { return empty; } +void taosUpdateItemSize(STaosQueue *queue, int32_t items) { + if (queue == NULL) return; + + taosThreadMutexLock(&queue->mutex); + queue->numOfItems -= items; + taosThreadMutexUnlock(&queue->mutex); +} + int32_t taosQueueItemSize(STaosQueue *queue) { if (queue == NULL) return 0; @@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } @@ -424,11 +433,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - queue->numOfItems--; + // queue->numOfItems--; queue->memOfItems -= pNode->size; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, + uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, queue->memOfItems); } @@ -468,9 +477,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * queue->head = NULL; queue->tail = NULL; - queue->numOfItems = 0; + // queue->numOfItems = 0; queue->memOfItems = 0; - uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int32_t j = 1; j < qall->numOfItems; ++j) { diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 5e3a0dc109..1f0731812c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -79,7 +79,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } @@ -89,6 +89,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { qinfo.threadNum = pool->num; (*((FItem)qinfo.fp))(&qinfo, msg); } + + taosUpdateItemSize(qinfo.queue, 1); } return NULL; @@ -214,6 +216,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { qinfo.threadNum = pool->num; (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs); } + taosUpdateItemSize(qinfo.queue, numOfMsgs); } return NULL; From 48dee594d3237546bdd2530cd7468c0c48bf07c0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 14:21:58 +0800 Subject: [PATCH 5/7] refactor: adjust the minimum number of fetch threads to 4 --- source/common/src/tglobal.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 34d37981ef..7947624451 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,12 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; -#if 0 tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); -#else - tsNumOfVnodeFetchThreads = 1; -#endif if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; From 825f80629f8b14bc2b54aef51e8df6b39f0cd0e1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 15:14:38 +0800 Subject: [PATCH 6/7] fix: compile error --- source/libs/transport/test/pushServer.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index 25972c9ec1..754433a5e6 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -31,13 +31,12 @@ void processShellMsg() { STaosQall *qall; SRpcMsg * pRpcMsg, rpcMsg; int type; - void * pvnode; SQueueInfo qinfo = {0}; qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, &qinfo); + int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &qinfo); tDebug("%d shell msgs are received", numOfMsgs); if (numOfMsgs <= 0) break; From dad12c24fbdb4a7b54636f26e322960df8a3b28c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jul 2022 19:56:38 +0800 Subject: [PATCH 7/7] refactor: comment out to avoid crash of tmqShow.py --- source/dnode/vnode/src/tq/tq.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbb972fafe..041984f079 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -244,6 +244,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; + // todo + workerId = 0; + // 1.find handle STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/