From edc9fe9705123357da84a6441d5238d13fe2a0b0 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 14:19:36 +0800 Subject: [PATCH 1/4] enh: change the error msg of INVALID_VGROUP_ID to Vnode is closed or removed --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 4 ++-- source/libs/sync/src/syncMain.c | 2 +- source/util/src/terror.c | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e4e0d608de..cd42a214cd 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -163,8 +163,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, - terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); + dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, + terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); terrno = (terrno != 0) ? terrno : -1; return terrno; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c25ea24249..5e0c75cf2b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -599,7 +599,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType)); code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr()); + sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr()); (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 31ffbbf177..19ebe4b7aa 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -319,7 +319,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_ALREADY_DEPLOYED, "Snode already deploye TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed") // vnode -TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode moved to another dnode or was deleted") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist") From f6be5f2c0edb4f0a4d79f825dbcca8baf5c2042d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 15:15:01 +0800 Subject: [PATCH 2/4] enh: change sync log repl mgr to sync log repl in logging msg --- source/libs/sync/src/syncPipeline.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 04e52b3f49..22c87343ef 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -633,7 +633,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { syncLogReplReset(pMgr); - sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, + sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, pDestId->addr); return -1; } @@ -658,8 +658,8 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->states[pos].acked) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { syncLogReplReset(pMgr); - sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, - index, pDestId->addr); + sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index, + pDestId->addr); goto _out; } continue; @@ -708,7 +708,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 + sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -725,7 +725,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->matchIndex = pMsg->matchIndex; pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 + sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -781,7 +781,7 @@ int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", + sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -794,8 +794,7 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 - ", old:%" PRId64, + sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -1141,8 +1140,8 @@ int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { - sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, - pDestId->addr, terrstr(), index); + sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr, + terrstr(), index); (void)syncLogReplReset(pMgr); } } From 09786a127d4fd0a3abc960a05b0f43a6651be223 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 17:57:24 +0800 Subject: [PATCH 3/4] enh: refactor func names doOnce, attempt, probe, and sendTo of syncLogRepl --- source/libs/sync/inc/syncPipeline.h | 14 ++++++++------ source/libs/sync/src/syncPipeline.c | 24 ++++++++++++------------ source/libs/sync/src/syncReplication.c | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 68db811b12..d709e33cd4 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -77,18 +77,19 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); + +int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, + bool* pBarrier); int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); -int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); // SSyncLogBuffer SSyncLogBuffer* syncLogBufferCreate(); @@ -100,6 +101,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); + int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 22c87343ef..519a19e8c7 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -666,7 +666,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } bool barrier = false; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; @@ -774,7 +774,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod // attempt to replicate the raft log at index (void)syncLogReplReset(pMgr); - return syncLogReplReplicateProbe(pMgr, pNode, index); + return syncLogReplProbe(pMgr, pNode, index); } int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { @@ -809,16 +809,16 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp return 0; } -int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { - (void)syncLogReplReplicateAttempt(pMgr, pNode); + (void)syncLogReplAttempt(pMgr, pNode); } else { - (void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); + (void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); @@ -833,7 +833,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -856,7 +856,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI return 0; } -int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -878,7 +878,7 @@ int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -931,7 +931,7 @@ int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, pMgr->startIndex = pMgr->matchIndex; } - return syncLogReplReplicateAttempt(pMgr, pNode); + return syncLogReplAttempt(pMgr, pNode); } SSyncLogReplMgr* syncLogReplCreate() { @@ -1126,8 +1126,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { +int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, + bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SRpcMsg msgOut = {0}; bool inBuf = false; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 43d2bc839b..8ac9a860e3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { continue; } SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - (void)syncLogReplReplicateOnce(pMgr, pNode); + (void)syncLogReplDoOnce(pMgr, pNode); } return 0; } From 3efa36ca51f3ecdc64b3b1156906ec4eb52c905d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 18:22:59 +0800 Subject: [PATCH 4/4] enh: refactor func name syncLogIsReplicationBarrier to syncLogReplBarrier --- source/libs/sync/inc/syncRaftEntry.h | 2 +- source/libs/sync/src/syncPipeline.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index a39e043c52..f9447e0168 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -45,7 +45,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) void syncEntryDestroy(SSyncRaftEntry* pEntry); void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7 -static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) { +static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) { return pEntry->originalRpcType == TDMT_SYNC_NOOP; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 519a19e8c7..69888ed8ea 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -1147,7 +1147,7 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind } goto _err; } - *pBarrier = syncLogIsReplicationBarrier(pEntry); + *pBarrier = syncLogReplBarrier(pEntry); prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index); if (prevLogTerm < 0) {