This commit is contained in:
Shengliang Guan 2022-03-28 16:43:52 +08:00
parent c3ba1881b1
commit 1e1573359d
10 changed files with 49 additions and 44 deletions

View File

@ -70,8 +70,8 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
typedef struct { typedef struct {
const char *name; const char *name;
int32_t minNum; int32_t min;
int32_t maxNum; int32_t max;
FItem fp; FItem fp;
void *param; void *param;
} SSingleWorkerCfg; } SSingleWorkerCfg;
@ -84,7 +84,7 @@ typedef struct {
typedef struct { typedef struct {
const char *name; const char *name;
int32_t maxNum; int32_t max;
FItems fp; FItems fp;
void *param; void *param;
} SMultiWorkerCfg; } SMultiWorkerCfg;

View File

@ -71,7 +71,7 @@ int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
int32_t bmStartWorker(SBnodeMgmt *pMgmt) { int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode write worker since %s", terrstr()); dError("failed to start bnode write worker since %s", terrstr());
return -1; return -1;

View File

@ -108,21 +108,22 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
} }
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
dndCleanupServer(pDnode); // dndCleanupServer(pDnode);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
if (except == n) continue; if (except == n) continue;
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper); pWrapper->required = false;
} }
} }
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont; pRpc->pCont = pCont;
dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle,
pRpc->ahandle);
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper, pMsg); int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg);
if (code != 0) { if (code != 0) {
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
@ -136,11 +137,13 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
} }
} }
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pRsp); pRpc->pCont = pCont;
pRsp->pCont = pCont; dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), pRpc->handle,
dndSendRsp(pWrapper, pRsp); pRpc->ahandle);
taosMemoryFree(pRsp);
dndSendRsp(pWrapper, pRpc);
taosMemoryFree(pRpc);
} }
static int32_t dndRunInMultiProcess(SDnode *pDnode) { static int32_t dndRunInMultiProcess(SDnode *pDnode) {

View File

@ -101,14 +101,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
SSingleWorkerCfg mgmtCfg = { SSingleWorkerCfg mgmtCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; .min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg statusCfg = { SSingleWorkerCfg statusCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; .min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) {
dError("failed to start dnode status worker since %s", terrstr()); dError("failed to start dnode status worker since %s", terrstr());
return -1; return -1;

View File

@ -123,27 +123,27 @@ int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg qCfg = {.min = 0, .max = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt};
SSingleWorkerCfg queryCfg = {.minNum = 0, .maxNum = 1, .name = "mnode-query", .fp = (FItem)mmProcessQueryQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start mnode-query worker since %s", terrstr()); dError("failed to start mnode-query worker since %s", terrstr());
return -1; return -1;
} }
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { SSingleWorkerCfg rCfg = {.min = 0, .max = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr()); dError("failed to start mnode-read worker since %s", terrstr());
return -1; return -1;
} }
if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { SSingleWorkerCfg wCfg = {.min = 0, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr()); dError("failed to start mnode-write worker since %s", terrstr());
return -1; return -1;
} }
if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) { SSingleWorkerCfg sCfg = {.min = 0, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
dError("failed to start mnode sync-worker since %s", terrstr()); dError("failed to start mnode sync-worker since %s", terrstr());
return -1; return -1;
} }

View File

@ -110,8 +110,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads; int32_t maxQueryThreads = minQueryThreads;
SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads, SSingleWorkerCfg queryCfg = {.min = minQueryThreads,
.maxNum = maxQueryThreads, .max = maxQueryThreads,
.name = "qnode-query", .name = "qnode-query",
.fp = (FItem)qmProcessQueryQueue, .fp = (FItem)qmProcessQueryQueue,
.param = pMgmt}; .param = pMgmt};
@ -121,8 +121,8 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1; return -1;
} }
SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads, SSingleWorkerCfg fetchCfg = {.min = minFetchThreads,
.maxNum = maxFetchThreads, .max = maxFetchThreads,
.name = "qnode-fetch", .name = "qnode-fetch",
.fp = (FItem)qmProcessFetchQueue, .fp = (FItem)qmProcessFetchQueue,
.param = pMgmt}; .param = pMgmt};

View File

@ -57,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr()); dError("failed to start snode-unique worker since %s", terrstr());
@ -69,8 +69,8 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
} }
} }
SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, SSingleWorkerCfg cfg = {.min = SND_SHARED_THREAD_NUM,
.maxNum = SND_SHARED_THREAD_NUM, .max = SND_SHARED_THREAD_NUM,
.name = "snode-shared", .name = "snode-shared",
.fp = (FItem)smProcessSharedQueue, .fp = (FItem)smProcessSharedQueue,
.param = pMgmt}; .param = pMgmt};

View File

@ -394,7 +394,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
if (tWWorkerInit(pWPool) != 0) return -1; if (tWWorkerInit(pWPool) != 0) return -1;
SSingleWorkerCfg cfg = { SSingleWorkerCfg cfg = {
.minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr()); dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1; return -1;

View File

@ -264,7 +264,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items,
headLen, pHead, bodyLen, pBody);
return 0; return 0;
} }
@ -277,7 +278,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
terrno = TSDB_CODE_OUT_OF_SHM_MEM; terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1; return 0;
} }
int32_t headLen = 0; int32_t headLen = 0;
@ -341,8 +342,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
*pHeadLen = headLen; *pHeadLen = headLen;
*pBodyLen = bodyLen; *pBodyLen = bodyLen;
uTrace("proc:%s, get msg:%p:%d cont:%p:%d from queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items,
return 0; headLen, pHead, bodyLen, pBody);
return 1;
} }
SProcObj *taosProcInit(const SProcCfg *pCfg) { SProcObj *taosProcInit(const SProcCfg *pCfg) {
@ -396,15 +398,15 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
void *pHead, *pBody; void *pHead, *pBody;
int32_t headLen, bodyLen; int32_t headLen, bodyLen;
uDebug("proc:%s, start to get message from queue:%p", pQueue->name, pQueue); uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue);
while (1) { while (1) {
int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen);
if (code < 0) { if (numOfMsgs == 0) {
uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue); uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue);
break; break;
} else if (code == 0) { } else if (numOfMsgs < 0) {
uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr()); uTrace("proc:%s, get no msg from queue:%p since %s", pQueue->name, pQueue, terrstr());
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {

View File

@ -287,8 +287,8 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
SQWorkerPool *pPool = &pWorker->pool; SQWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name; pPool->name = pCfg->name;
pPool->min = pCfg->minNum; pPool->min = pCfg->min;
pPool->max = pCfg->maxNum; pPool->max = pCfg->max;
if (tQWorkerInit(pPool) != 0) { if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -316,7 +316,7 @@ void tSingleWorkerCleanup(SSingleWorker *pWorker) {
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
SWWorkerPool *pPool = &pWorker->pool; SWWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name; pPool->name = pCfg->name;
pPool->max = pCfg->maxNum; pPool->max = pCfg->max;
if (tWWorkerInit(pPool) != 0) { if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;