diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 4cf4800472..1e1ed3bddc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -230,7 +230,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo); int32_t syncStart(int64_t rid); void syncStop(int64_t rid); void syncPreStop(int64_t rid); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); @@ -240,6 +240,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); bool syncIsReadyForRead(int64_t rid); bool syncSnapshotSending(int64_t rid); bool syncSnapshotRecving(int64_t rid); +void syncSendTimeoutRsp(int64_t rid, int64_t seq); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/include/util/tdef.h b/include/util/tdef.h index e0089014aa..47435a2322 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -497,6 +497,9 @@ enum { // sort page size by default #define DEFAULT_PAGESIZE 4096 +#define VNODE_TIMEOUT_SEC 60 +#define MNODE_TIMEOUT_SEC 10 + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 56a9a0e22b..2345c752a9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -334,7 +334,18 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { taosMemoryFree(pMgmt); } -static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {} +static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) { + taosThreadRwlockRdlock(&pMgmt->lock); + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + vnodeSyncCheckTimeout(pVnode->pImpl); + pIter = taosHashIterate(pMgmt->hash, pIter); + } +} static void *vmThreadFp(void *param) { SVnodeMgmt *pMgmt = param; @@ -348,7 +359,7 @@ static void *vmThreadFp(void *param) { if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; - if (sec % (tsStatusInterval * 5) == 0) { + if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) { vmCheckSyncTimeout(pMgmt); } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 99260ffefd..540f0c3127 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -54,6 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); +void vnodeSyncCheckTimeout(SVnode* pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8cf212cb1d..e025626ecf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -344,6 +344,8 @@ struct SVnode { bool blocked; bool restored; tsem_t syncSem; + int32_t blockSec; + int64_t blockSeq; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5a0556ba27..36eb6b293a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -22,7 +22,8 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, + TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); tsem_wait(&pVnode->syncSem); } @@ -202,12 +203,16 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) #else static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { + int64_t seq = 0; + taosThreadMutexLock(&pVnode->lock); - int32_t code = syncPropose(pVnode->sync, pMsg, isWeak); + int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); if (wait) { ASSERT(!pVnode->blocked); pVnode->blocked = true; + pVnode->blockSec = taosGetTimestampSec(); + pVnode->blockSeq = seq; } taosThreadMutexUnlock(&pVnode->lock); @@ -606,6 +611,25 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } +void vnodeSyncCheckTimeout(SVnode *pVnode) { + vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId); + taosThreadMutexLock(&pVnode->lock); + if (pVnode->blocked) { + int32_t curSec = taosGetTimestampSec(); + int32_t delta = curSec - pVnode->blockSec; + if (delta > VNODE_TIMEOUT_SEC) { + syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq); + vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64, + pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq); + pVnode->blocked = false; + pVnode->blockSec = 0; + pVnode->blockSeq = 0; + tsem_post(&pVnode->syncSem); + } + } + taosThreadMutexUnlock(&pVnode->lock); +} + bool vnodeIsRoleLeader(SVnode *pVnode) { SSyncState state = syncGetState(pVnode->sync); return state.state == TAOS_SYNC_STATE_LEADER; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index dee0bf95c7..a5524ffbde 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -215,7 +215,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode); -int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); +int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t *seq); int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6fabab18cb..f4a6050171 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { } syncNodeStartHeartbeatTimer(pSyncNode); - //syncNodeReplicate(pSyncNode); + // syncNodeReplicate(pSyncNode); } syncNodeRelease(pSyncNode); @@ -218,6 +218,18 @@ int32_t syncLeaderTransfer(int64_t rid) { return ret; } +void syncSendTimeoutRsp(int64_t rid, int64_t seq) { + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode == NULL) return; + + SRpcMsg rpcMsg = {0}; + (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info); + rpcMsg.code = TSDB_CODE_SYN_TIMEOUT; + + syncNodeRelease(pNode); + rpcSendResponse(&rpcMsg); +} + SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { SyncIndex minMatchIndex = SYNC_INDEX_INVALID; @@ -538,7 +550,7 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newNodeInfo = newLeader; - int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false); + int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false, NULL); rpcFreeCont(rpcMsg.pCont); return ret; } @@ -670,19 +682,19 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sError("sync propose error"); return -1; } - int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); + int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq); syncNodeRelease(pSyncNode); return ret; } -int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { +int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { terrno = TSDB_CODE_SYN_NOT_LEADER; sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); @@ -739,6 +751,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); } + if (seq != NULL) *seq = seqNum; return code; } } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 057f2ea6dd..7a5d0777bb 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -337,7 +337,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index bab3d2236f..f35a23b15b 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -249,7 +249,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 4a82bba15d..22132eb01f 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -189,7 +189,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 8c486df118..4bc2e92d0c 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -396,7 +396,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); - int32_t ret = syncPropose(rid, pRpcMsg, false); + int32_t ret = syncPropose(rid, pRpcMsg, false, NULL); if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else {