enh: adjust vnode fetch queue number
This commit is contained in:
parent
6bc68021ba
commit
943de8bc15
|
@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
|
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1);
|
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||||
|
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||||
|
|
|
@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
|
||||||
const char *path;
|
const char *path;
|
||||||
const char *name;
|
const char *name;
|
||||||
SQWorkerPool queryPool;
|
SQWorkerPool queryPool;
|
||||||
SQWorkerPool fetchPool;
|
SWWorkerPool fetchPool;
|
||||||
SWWorkerPool syncPool;
|
SWWorkerPool syncPool;
|
||||||
SWWorkerPool writePool;
|
SWWorkerPool writePool;
|
||||||
SWWorkerPool applyPool;
|
SWWorkerPool applyPool;
|
||||||
|
|
|
@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
|
||||||
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||||
if (code != 0) {
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
if (terrno != 0) code = terrno;
|
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
|
||||||
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
|
|
||||||
vmSendRsp(pMsg, code);
|
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
|
if (code != 0) {
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
|
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
|
||||||
|
vmSendRsp(pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
|
@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||||
|
|
||||||
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
|
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
|
||||||
pVnode->pFetchQ == NULL) {
|
pVnode->pFetchQ == NULL) {
|
||||||
|
@ -259,7 +264,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
|
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
pVnode->pWriteQ = NULL;
|
pVnode->pWriteQ = NULL;
|
||||||
pVnode->pSyncQ = NULL;
|
pVnode->pSyncQ = NULL;
|
||||||
pVnode->pApplyQ = NULL;
|
pVnode->pApplyQ = NULL;
|
||||||
|
@ -275,11 +280,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
pQPool->max = tsNumOfVnodeQueryThreads;
|
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||||
|
|
||||||
SQWorkerPool *pFPool = &pMgmt->fetchPool;
|
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
pFPool->name = "vnode-fetch";
|
pFPool->name = "vnode-fetch";
|
||||||
pFPool->min = tsNumOfVnodeFetchThreads;
|
|
||||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||||
if (tQWorkerInit(pFPool) != 0) return -1;
|
if (tWWorkerInit(pFPool) != 0) return -1;
|
||||||
|
|
||||||
SWWorkerPool *pWPool = &pMgmt->writePool;
|
SWWorkerPool *pWPool = &pMgmt->writePool;
|
||||||
pWPool->name = "vnode-write";
|
pWPool->name = "vnode-write";
|
||||||
|
@ -325,6 +329,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||||
tWWorkerCleanup(&pMgmt->applyPool);
|
tWWorkerCleanup(&pMgmt->applyPool);
|
||||||
tWWorkerCleanup(&pMgmt->syncPool);
|
tWWorkerCleanup(&pMgmt->syncPool);
|
||||||
tQWorkerCleanup(&pMgmt->queryPool);
|
tQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||||
dDebug("vnode workers are closed");
|
dDebug("vnode workers are closed");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue