Merge pull request #20678 from taosdata/FIX/TD-22975-main
enh: separate sync read and write
This commit is contained in:
commit
eaf56916fe
|
@ -34,7 +34,7 @@ typedef enum {
|
|||
WRITE_QUEUE,
|
||||
APPLY_QUEUE,
|
||||
SYNC_QUEUE,
|
||||
SYNC_CTRL_QUEUE,
|
||||
SYNC_RD_QUEUE,
|
||||
STREAM_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
|
|
@ -259,7 +259,7 @@ enum {
|
|||
|
||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT_ELECTION, "sync-elect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct SMnodeMgmt {
|
|||
SSingleWorker readWorker;
|
||||
SSingleWorker writeWorker;
|
||||
SSingleWorker syncWorker;
|
||||
SSingleWorker syncCtrlWorker;
|
||||
SSingleWorker syncRdWorker;
|
||||
bool stopped;
|
||||
int32_t refCount;
|
||||
TdThreadRwlock lock;
|
||||
|
@ -54,7 +54,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
|||
void mmStopWorker(SMnodeMgmt *pMgmt);
|
||||
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -188,7 +188,8 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
@ -198,11 +199,12 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -111,8 +111,8 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncCtrlWorker, pMsg);
|
||||
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
|
@ -151,8 +151,8 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
case SYNC_QUEUE:
|
||||
pWorker = &pMgmt->syncWorker;
|
||||
break;
|
||||
case SYNC_CTRL_QUEUE:
|
||||
pWorker = &pMgmt->syncCtrlWorker;
|
||||
case SYNC_RD_QUEUE:
|
||||
pWorker = &pMgmt->syncRdWorker;
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
|
@ -238,12 +238,12 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
SSingleWorkerCfg scCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
.name = "mnode-sync-ctrl",
|
||||
.name = "mnode-sync-rd",
|
||||
.fp = (FItem)mmProcessSyncMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) {
|
||||
dError("failed to start mnode mnode-sync-ctrl worker since %s", terrstr());
|
||||
if (tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg) != 0) {
|
||||
dError("failed to start mnode mnode-sync-rd worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -259,6 +259,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
|
|||
tSingleWorkerCleanup(&pMgmt->readWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->writeWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncCtrlWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncRdWorker);
|
||||
dDebug("mnode workers are closed");
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ typedef struct {
|
|||
SVnode *pImpl;
|
||||
SMultiWorker pWriteW;
|
||||
SMultiWorker pSyncW;
|
||||
SMultiWorker pSyncCtrlW;
|
||||
SMultiWorker pSyncRdW;
|
||||
SMultiWorker pApplyW;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
|
@ -107,7 +107,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
|
|||
|
||||
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -549,7 +549,8 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
@ -559,12 +560,12 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -98,9 +98,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
pVnode->pSyncW.queue->threadId);
|
||||
tMultiWorkerCleanup(&pVnode->pSyncW);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
|
||||
pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
|
||||
tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
|
||||
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
||||
pVnode->pSyncRdW.queue->threadId);
|
||||
tMultiWorkerCleanup(&pVnode->pSyncRdW);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
||||
pVnode->pApplyW.queue->threadId);
|
||||
|
|
|
@ -216,9 +216,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncW.queue, pMsg);
|
||||
break;
|
||||
case SYNC_CTRL_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncCtrlW.queue, pMsg);
|
||||
case SYNC_RD_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncRdW.queue, pMsg);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
|
@ -234,9 +234,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
|
||||
}
|
||||
int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_RD_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
|
||||
|
||||
|
@ -327,18 +325,18 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
|
||||
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
|
||||
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
|
||||
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
|
||||
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
|
||||
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
|
||||
(void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
|
||||
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
||||
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -348,8 +346,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pWriteW.queue->threadId);
|
||||
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
|
||||
pVnode->pSyncW.queue->threadId);
|
||||
dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlW.queue,
|
||||
pVnode->pSyncCtrlW.queue->threadId);
|
||||
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
|
||||
pVnode->pSyncRdW.queue->threadId);
|
||||
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
|
||||
pVnode->pApplyW.queue->threadId);
|
||||
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
|
||||
|
|
|
@ -33,7 +33,7 @@ static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
|
|
@ -378,7 +378,7 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
|
|
@ -105,6 +105,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
SRpcMsg rpcRsp = {0};
|
||||
bool accepted = false;
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
bool resetElect = false;
|
||||
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||
|
@ -137,7 +138,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeResetElectTimer(ths);
|
||||
resetElect = true;
|
||||
|
||||
if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
|
||||
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
|
||||
|
@ -184,10 +185,9 @@ _SEND_RESPONSE:
|
|||
// commit index, i.e. leader notice me
|
||||
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
|
||||
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
_out:
|
||||
if (resetElect) syncNodeResetElectTimer(ths);
|
||||
return 0;
|
||||
|
||||
_IGNORE:
|
||||
|
|
|
@ -115,6 +115,5 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
|||
ASSERT(ret == 0);
|
||||
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -182,6 +182,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
|||
case TDMT_SYNC_TIMEOUT:
|
||||
code = syncNodeOnTimeout(pSyncNode, pMsg);
|
||||
break;
|
||||
case TDMT_SYNC_TIMEOUT_ELECTION:
|
||||
code = syncNodeOnTimeout(pSyncNode, pMsg);
|
||||
break;
|
||||
case TDMT_SYNC_CLIENT_REQUEST:
|
||||
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
|
||||
break;
|
||||
|
@ -1593,8 +1596,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
// trace log
|
||||
sNTrace(pSyncNode, "become follower %s", debugStr);
|
||||
|
||||
// send rsp to client
|
||||
syncNodeLeaderChangeRsp(pSyncNode);
|
||||
|
@ -1610,8 +1613,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// reset log buffer
|
||||
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||
|
||||
// trace log
|
||||
sNTrace(pSyncNode, "become follower %s", debugStr);
|
||||
// reset elect timer
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
|
@ -2277,6 +2280,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
|||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
||||
bool resetElect = false;
|
||||
|
||||
const STraceId* trace = &pRpcMsg->info.traceId;
|
||||
char tbuf[40] = {0};
|
||||
|
@ -2300,12 +2304,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
|
||||
resetElect = true;
|
||||
|
||||
syncNodeResetElectTimer(ths);
|
||||
ths->minMatchIndex = pMsg->minMatchIndex;
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
||||
SRpcMsg rpcMsgLocalCmd = {0};
|
||||
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||
|
||||
|
@ -2328,7 +2331,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
// syncNodeStepDown(ths, pMsg->term);
|
||||
SRpcMsg rpcMsgLocalCmd = {0};
|
||||
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||
|
||||
|
@ -2348,15 +2350,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// htonl
|
||||
SMsgHead* pHead = rpcMsg.pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
*/
|
||||
|
||||
// reply
|
||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||
|
||||
if (resetElect) syncNodeResetElectTimer(ths);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
|||
SSyncNode* pNode) {
|
||||
int32_t bytes = sizeof(SyncTimeout);
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
pMsg->msgType = TDMT_SYNC_TIMEOUT;
|
||||
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
|
||||
pMsg->contLen = bytes;
|
||||
if (pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -31,7 +31,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
|||
|
||||
SyncTimeout* pTimeout = pMsg->pCont;
|
||||
pTimeout->bytes = bytes;
|
||||
pTimeout->msgType = TDMT_SYNC_TIMEOUT;
|
||||
pTimeout->msgType = pMsg->msgType;
|
||||
pTimeout->vgId = pNode->vgId;
|
||||
pTimeout->timeoutType = timeoutType;
|
||||
pTimeout->logicClock = logicClock;
|
||||
|
|
|
@ -89,6 +89,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
int32_t ret = 0;
|
||||
SyncRequestVote* pMsg = pRpcMsg->pCont;
|
||||
bool resetElect = false;
|
||||
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
|
||||
|
@ -115,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncNodeStepDown(ths, currentTerm);
|
||||
|
||||
// forbid elect for this round
|
||||
syncNodeResetElectTimer(ths);
|
||||
resetElect = true;
|
||||
}
|
||||
|
||||
// send msg
|
||||
|
@ -134,5 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
|
||||
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
|
||||
if (resetElect) syncNodeResetElectTimer(ths);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -798,7 +798,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
|
||||
syncNodeStepDown(pSyncNode, pMsg->term);
|
||||
}
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
|
||||
// state, term, seq/ack
|
||||
int32_t code = 0;
|
||||
|
@ -840,6 +839,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
code = -1;
|
||||
}
|
||||
|
||||
syncNodeResetElectTimer(pSyncNode);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -120,9 +120,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
|
|||
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
|
||||
++(ths->pingTimerCounter);
|
||||
|
||||
// syncNodePingAll(ths);
|
||||
// syncNodePingPeers(ths);
|
||||
|
||||
syncNodeTimerRoutine(ths);
|
||||
}
|
||||
|
||||
|
@ -138,8 +135,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
|
|||
++(ths->heartbeatTimerCounter);
|
||||
sTrace("vgId:%d, sync timer, type:replicate count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId,
|
||||
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
|
||||
|
||||
// syncNodeReplicate(ths, true);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue