enh: adjust the number of vnode threads so that one vnode has one write thread
This commit is contained in:
parent
4fc0e3bd55
commit
2e6f75107d
|
@ -299,7 +299,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
|
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
|
||||||
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||||
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||||
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode};
|
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
|
||||||
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
|
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
|
||||||
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
|
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
|
||||||
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
|
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
|
||||||
|
|
|
@ -244,6 +244,16 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
@ -253,6 +263,16 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
|
if (pMsg == NULL || pMsg->pCont == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
Loading…
Reference in New Issue