From d674c8370bb7b363012fbf6d7705d2dce9fd41fa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 20 Dec 2022 21:30:46 +0800 Subject: [PATCH] refact: update sync log --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/inc/syncSnapshot.h | 2 +- source/libs/sync/inc/syncUtil.h | 6 - source/libs/sync/src/syncMain.c | 10 +- source/libs/sync/src/syncRespMgr.c | 24 +- source/libs/sync/src/syncSnapshot.c | 559 +++++++++++------- source/libs/sync/src/syncUtil.c | 101 +--- .../sync_test_lib/src/syncSnapshotDebug.c | 4 +- 8 files changed, 392 insertions(+), 316 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a5524ffbde..6af60af43d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -227,7 +227,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index ee83636192..2b6e14a457 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -86,7 +86,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive // on message int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); -int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg); +int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index be14ef91a4..7d08585656 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -100,12 +100,6 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64 void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s); -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); - void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b3efcb4823..9d749d8e58 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -194,7 +194,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { code = syncNodeOnSnapshot(pSyncNode, pMsg); break; case TDMT_SYNC_SNAPSHOT_RSP: - code = syncNodeOnSnapshotReply(pSyncNode, pMsg); + code = syncNodeOnSnapshotRsp(pSyncNode, pMsg); break; case TDMT_SYNC_LOCAL_CMD: code = syncNodeOnLocalCmd(pSyncNode, pMsg); @@ -705,7 +705,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { terrno = TSDB_CODE_SYN_NOT_LEADER; - sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); + sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType)); return -1; } @@ -890,10 +890,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; SSyncCfg* pCfg = &pSyncInfo->syncCfg; - sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); + sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->replicaNum; ++i) { SNodeInfo* pNode = &pCfg->nodeInfo[i]; - sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort); + sInfo("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort); } memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); @@ -1086,7 +1086,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); // ASSERT(pSender != NULL); (pSyncNode->senders)[i] = pSender; - sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender); + sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender); } // snapshot receivers diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 79a38cad7a..718e9b1071 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -36,21 +36,21 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { taosThreadMutexInit(&(pObj->mutex), NULL); SSyncNode *pNode = pObj->data; - sTrace("vgId:%d, create resp manager", pNode->vgId); + sDebug("vgId:%d, resp manager create", pNode->vgId); return pObj; } void syncRespMgrDestroy(SSyncRespMgr *pObj) { - if (pObj != NULL) { - SSyncNode *pNode = pObj->data; - sTrace("vgId:%d, destroy resp manager", pNode->vgId); + if (pObj == NULL) return; - taosThreadMutexLock(&pObj->mutex); - taosHashCleanup(pObj->pRespHash); - taosThreadMutexUnlock(&pObj->mutex); - taosThreadMutexDestroy(&(pObj->mutex)); - taosMemoryFree(pObj); - } + SSyncNode *pNode = pObj->data; + sDebug("vgId:%d, resp manager destroy", pNode->vgId); + + taosThreadMutexLock(&pObj->mutex); + taosHashCleanup(pObj->pRespHash); + taosThreadMutexUnlock(&pObj->mutex); + taosThreadMutexDestroy(&(pObj->mutex)); + taosMemoryFree(pObj); } uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) { @@ -174,7 +174,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { void syncRespCleanRsp(SSyncRespMgr *pObj) { SSyncNode *pNode = pObj->data; - sTrace("vgId:%d, clean all rsp", pNode->vgId); + sTrace("vgId:%d, clean all resp", pNode->vgId); taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, -1, true); @@ -183,7 +183,7 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) { void syncRespClean(SSyncRespMgr *pObj) { SSyncNode *pNode = pObj->data; - sTrace("vgId:%d, clean rsp by ttl", pNode->vgId); + sTrace("vgId:%d, clean resp by ttl", pNode->vgId); taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, pObj->ttl, false); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 6ec4692307..8d70404032 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -26,59 +26,61 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); + if (!condition) return NULL; - SSyncSnapshotSender *pSender = NULL; - if (condition) { - pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender)); - if (pSender == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pSender->start = false; - pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; - pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; - pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; - pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; - pSender->pSyncNode = pSyncNode; - pSender->replicaIndex = replicaIndex; - pSender->term = pSyncNode->pRaftStore->currentTerm; - pSender->startTime = 0; - pSender->endTime = 0; - pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); - pSender->finish = false; - } else { - sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId); + SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender)); + if (pSender == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } + pSender->start = false; + pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->pReader = NULL; + pSender->pCurrentBlock = NULL; + pSender->blockLen = 0; + pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; + pSender->pSyncNode = pSyncNode; + pSender->replicaIndex = replicaIndex; + pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->startTime = 0; + pSender->endTime = 0; + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); + pSender->finish = false; + + sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId); return pSender; } void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { - if (pSender != NULL) { - // free current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - } + if (pSender == NULL) return; + sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId); - // close reader - if (pSender->pReader != NULL) { - pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - pSender->pReader = NULL; - } - - // free sender - taosMemoryFree(pSender); + // free current block + if (pSender->pCurrentBlock != NULL) { + taosMemoryFree(pSender->pCurrentBlock); + pSender->pCurrentBlock = NULL; } + + // close reader + if (pSender->pReader != NULL) { + pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + pSender->pReader = NULL; + } + + // free sender + taosMemoryFree(pSender); } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { - ASSERT(!snapshotSenderIsStart(pSender)); + if (snapshotSenderIsStart(pSender)) { + sSError(pSender, "vgId:%d, snapshot sender is already start"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } pSender->start = true; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; @@ -86,10 +88,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->pReader = NULL; pSender->pCurrentBlock = NULL; pSender->blockLen = 0; - pSender->snapshotParam.start = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID; - pSender->snapshot.data = NULL; pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; @@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { // build begin msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); + if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); + return -1; + } SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; @@ -120,15 +123,20 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; // send msg - syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); + return -1; + } // event log - sSTrace(pSender, "snapshot sender start"); + sSDebug(pSender, "snapshot sender start"); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); return 0; } int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { + sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader); + // update flag pSender->start = false; pSender->finish = finish; @@ -147,8 +155,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { pSender->blockLen = 0; } - // event log - sSTrace(pSender, "snapshot sender stop"); return 0; } @@ -164,18 +170,27 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { // read data int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, - &(pSender->pCurrentBlock), &(pSender->blockLen)); - ASSERT(ret == 0); + &pSender->pCurrentBlock, &pSender->blockLen); + if (ret != 0) { + sSError(pSender, "snapshot sender read failed since %s", terrstr()); + return -1; + } + if (pSender->blockLen > 0) { + sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq); // has read data } else { // read finish, update seq to end pSender->seq = SYNC_SNAPSHOT_SEQ_END; + sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq); } // build msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); + if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); + return -1; + } SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; @@ -187,7 +202,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; - // pMsg->privateTerm = pSender->privateTerm; if (pSender->pCurrentBlock != NULL) { @@ -195,27 +209,32 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { } // send msg - syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); + return -1; + } pSender->lastSendTime = taosGetTimestampMs(); // event log if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - sSTrace(pSender, "snapshot sender finish"); + sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish"); } else { - sSTrace(pSender, "snapshot sender sending"); + sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending"); } return 0; } // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { - // send current block data - // build msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); + if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); + return -1; + } SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; @@ -234,56 +253,63 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { } // send msg - syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); + return -1; + } pSender->lastSendTime = taosGetTimestampMs(); // event log - sSTrace(pSender, "snapshot sender resend"); - + sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend"); return 0; } -static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - ASSERT(pMsg->ack == pSender->seq); +static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + if (pMsg->ack != pSender->seq) { + sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + pSender->ack = pMsg->ack; - ++(pSender->seq); + pSender->seq++; + + sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq); + return 0; } // return 0, start ok // return 1, last snapshot finish ok // return -1, error int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { - sNTrace(pSyncNode, "starting snapshot ..."); + sNInfo(pSyncNode, "snapshot sender starting ..."); SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { - sNError(pSyncNode, "start snapshot error, sender is null"); + sNError(pSyncNode, "snapshot sender start error since get failed"); return -1; } - int32_t code = 0; - if (snapshotSenderIsStart(pSender)) { - sNTrace(pSyncNode, "snapshot sender already start, ignore"); + sSError(pSender, "snapshot sender already start, ignore"); return 0; } - if (!snapshotSenderIsStart(pSender) && pSender->finish && - taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { - sNTrace(pSyncNode, "snapshot sender too frequently, ignore"); + if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { + sSInfo(pSender, "snapshot sender start too frequently, ignore"); return 1; } char host[64]; uint16_t port; syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); - sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port); + sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port); - code = snapshotSenderStart(pSender); + int32_t code = snapshotSenderStart(pSender); if (code != 0) { - sNError(pSyncNode, "snapshot sender start error"); + sSError(pSender, "snapshot sender start error since %s", terrstr()); return -1; } @@ -293,70 +319,79 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); + if (!condition) return NULL; - SSyncSnapshotReceiver *pReceiver = NULL; - if (condition) { - pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver)); - if (pReceiver == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pReceiver->start = false; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - pReceiver->pWriter = NULL; - pReceiver->pSyncNode = pSyncNode; - pReceiver->fromId = fromId; - pReceiver->term = pSyncNode->pRaftStore->currentTerm; - pReceiver->snapshot.data = NULL; - pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; - pReceiver->snapshot.lastApplyTerm = 0; - pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - - } else { - sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId); + SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver)); + if (pReceiver == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } + pReceiver->start = false; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + pReceiver->pWriter = NULL; + pReceiver->pSyncNode = pSyncNode; + pReceiver->fromId = fromId; + pReceiver->term = pSyncNode->pRaftStore->currentTerm; + pReceiver->snapshot.data = NULL; + pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; + pReceiver->snapshot.lastApplyTerm = 0; + pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; + + sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId); return pReceiver; } void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { - if (pReceiver != NULL) { - // close writer - if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, - false, &(pReceiver->snapshot)); - ASSERT(ret == 0); - pReceiver->pWriter = NULL; - } + if (pReceiver == NULL) return; + sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId); - // free receiver - taosMemoryFree(pReceiver); + // close writer + if (pReceiver->pWriter != NULL) { + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, + &pReceiver->snapshot); + if (ret != 0) { + sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr()); + } + pReceiver->pWriter = NULL; } + + // free receiver + taosMemoryFree(pReceiver); } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } // force stop void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { + sRInfo(pReceiver, "snapshot receiver force stop, writer:%p"); + // force close, abandon incomplete data if (pReceiver->pWriter != NULL) { // event log - sRTrace(pReceiver, "snapshot receiver force stop"); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &(pReceiver->snapshot)); - ASSERT(ret == 0); + &pReceiver->snapshot); + if (ret != 0) { + sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr()); + } pReceiver->pWriter = NULL; } pReceiver->start = false; - - // event log - // sRTrace(pReceiver, "snapshot receiver force stop"); } int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { - ASSERT(snapshotReceiverIsStart(pReceiver)); + if (!snapshotReceiverIsStart(pReceiver)) { + sRError(pReceiver, "snapshot receiver is not start"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + if (pReceiver->pWriter != NULL) { + sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } // update ack pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; @@ -365,25 +400,25 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; - pReceiver->snapshotParam.start = pBeginMsg->beginIndex; pReceiver->snapshotParam.end = pBeginMsg->lastIndex; // start writer - ASSERT(pReceiver->pWriter == NULL); - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, - &(pReceiver->snapshotParam), &(pReceiver->pWriter)); - ASSERT(ret == 0); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam, + &pReceiver->pWriter); + if (ret != 0) { + sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr()); + return -1; + } // event log - sRTrace(pReceiver, "snapshot receiver start writer"); - + sRInfo(pReceiver, "snapshot receiver start write"); return 0; } int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) { if (snapshotReceiverIsStart(pReceiver)) { - sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId); + sRInfo(pReceiver, "snapshot receiver has started"); return 0; } @@ -394,49 +429,57 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend pReceiver->startTime = pPreMsg->startTime; // event log - sRTrace(pReceiver, "snapshot receiver start"); - + sRInfo(pReceiver, "snapshot receiver is start"); return 0; } // just set start = false // FpSnapshotStopWrite should not be called, assert writer == NULL int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { + sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); + if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &(pReceiver->snapshot)); - ASSERT(ret == 0); + &pReceiver->snapshot); + if (ret != 0) { + sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + } pReceiver->pWriter = NULL; + } else { + sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } pReceiver->start = false; - - // event log - sRTrace(pReceiver, "snapshot receiver stop"); return 0; } // when recv last snapshot block, apply data into snapshot static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { - ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); + if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) { + sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } int32_t code = 0; if (pReceiver->pWriter != NULL) { // write data + sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); if (pMsg->dataLen > 0) { code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); if (code != 0) { - sNError(pReceiver->pSyncNode, "snapshot write error"); + sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr()); return -1; } } // reset wal + sRInfo(pReceiver, "snapshot receiver log restore"); code = pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); if (code != 0) { - sNError(pReceiver->pSyncNode, "wal restore from snapshot error"); + sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr()); return -1; } @@ -452,10 +495,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } // stop writer, apply data + sRInfo(pReceiver, "snapshot receiver apply write"); code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, - &(pReceiver->snapshot)); + &pReceiver->snapshot); if (code != 0) { - sNError(pReceiver->pSyncNode, "snapshot stop writer true error"); + sRError(pReceiver, "snapshot receiver apply failed since %s", terrstr()); return -1; } pReceiver->pWriter = NULL; @@ -464,34 +508,48 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; } else { - sNError(pReceiver->pSyncNode, "snapshot stop writer true error"); + sRError(pReceiver, "snapshot receiver finish error since writer is null"); return -1; } // event log - sRTrace(pReceiver, "snapshot receiver got last data, finish, apply snapshot"); + sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished"); return 0; } // apply data block // update progress -static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { - ASSERT(pMsg->seq == pReceiver->ack + 1); - - if (pReceiver->pWriter != NULL) { - if (pMsg->dataLen > 0) { - // apply data block - int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, - pMsg->data, pMsg->dataLen); - ASSERT(code == 0); - } - - // update progress - pReceiver->ack = pMsg->seq; - - // event log - sRTrace(pReceiver, "snapshot receiver receiving"); +static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + if (pMsg->seq != pReceiver->ack + 1) { + sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } + + if (pReceiver->pWriter == NULL) { + sRError(pReceiver, "snapshot receiver failed to write data since writer is null"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); + + if (pMsg->dataLen > 0) { + // apply data block + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + pMsg->data, pMsg->dataLen); + if (code != 0) { + sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr()); + return -1; + } + } + + // update progress + pReceiver->ack = pMsg->seq; + + // event log + sRDebug(pReceiver, "snapshot receiver continue to write finish"); + return 0; } SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { @@ -499,7 +557,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { if (syncNodeIsMnode(ths)) { snapStart = SYNC_INDEX_BEGIN; - + sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart); } else { SSyncLogStoreData *pData = ths->pLogStore->data; SWal *pWal = pData->pWal; @@ -514,6 +572,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { } else { snapStart = ths->commitIndex + 1; } + + sNInfo(ths, "snapshot begin index is %" PRId64, snapStart); } return snapStart; @@ -521,41 +581,48 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + int64_t timeNow = taosGetTimestampMs(); if (snapshotReceiverIsStart(pReceiver)) { // already start - if (pMsg->startTime > pReceiver->startTime) { + sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver", + pReceiver->startTime, pMsg->startTime); goto _START_RECEIVER; - } else if (pMsg->startTime == pReceiver->startTime) { + sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply", + pReceiver->startTime, pMsg->startTime); goto _SEND_REPLY; } else { // ignore + sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore", + pReceiver->startTime, pMsg->startTime); return 0; } } else { // start new + sRInfo(pReceiver, "snapshot receiver not start yet so start new one"); goto _START_RECEIVER; } _START_RECEIVER: - if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { - sNError(pSyncNode, "snapshot receiver time skew too much"); + if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { + sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow, + pMsg->startTime); return -1; } else { // waiting for clock match - int64_t timeNow = taosGetTimestampMs(); while (timeNow < pMsg->startTime) { - sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, - pMsg->startTime); + sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); taosMsleep(10); timeNow = taosGetTimestampMs(); } if (snapshotReceiverIsStart(pReceiver)) { + sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); snapshotReceiverForceStop(pReceiver); } @@ -567,7 +634,10 @@ _SEND_REPLY: ; // make complier happy SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); + if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); + return -1; + } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; @@ -581,8 +651,12 @@ _SEND_REPLY: pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); // send msg - syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot"); + if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { + sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); + return -1; + } + return 0; } @@ -591,12 +665,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; if (!snapshotReceiverIsStart(pReceiver)) { - sNError(pSyncNode, "snapshot receiver not start"); + sRError(pReceiver, "snapshot receiver not start"); return -1; } if (pReceiver->startTime != pMsg->startTime) { - sNError(pSyncNode, "snapshot receiver time not equal"); + sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64, + pReceiver->startTime, pMsg->startTime); return -1; } @@ -605,7 +680,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p // build msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); + if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); + return -1; + } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; @@ -619,8 +697,12 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; // send msg - syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin"); + if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { + sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr()); + return -1; + } + return 0; } @@ -632,18 +714,22 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS // waiting for clock match int64_t timeNow = taosGetTimestampMs(); while (timeNow < pMsg->startTime) { - sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, - pMsg->startTime); + sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); taosMsleep(10); + timeNow = taosGetTimestampMs(); } - if (pMsg->seq == pReceiver->ack + 1) { - snapshotReceiverGotData(pReceiver, pMsg); + if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { + return -1; } // build msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); + if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) { + sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); + return -1; + } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; @@ -657,8 +743,11 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; // send msg - syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving"); + if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { + sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr()); + return -1; + } return 0; } @@ -670,8 +759,8 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // waiting for clock match int64_t timeNow = taosGetTimestampMs(); while (timeNow < pMsg->startTime) { - sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, - pMsg->startTime); + sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); taosMsleep(10); timeNow = taosGetTimestampMs(); } @@ -683,7 +772,10 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // build msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); + if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr()); + return -1; + } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; @@ -697,8 +789,12 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; // send msg - syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end"); + if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { + sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr()); + return -1; + } + return 0; } @@ -723,16 +819,17 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // condition 5, got data, update ack // int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { - SyncSnapshotSend *pMsg = pRpcMsg->pCont; + SyncSnapshotSend *pMsg = pRpcMsg->pCont; + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; // if already drop replica, do not process - if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { + if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); return 0; } if (pMsg->term < pSyncNode->pRaftStore->currentTerm) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject, small term"); + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); return 0; } @@ -741,45 +838,42 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } syncNodeResetElectTimer(pSyncNode); - int32_t code = 0; - SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, ""); - // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); syncNodeOnSnapshotPre(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin"); syncNodeOnSnapshotBegin(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end"); syncNodeOnSnapshotEnd(pSyncNode, pMsg); - (void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); - + if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { + sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); + return -1; + } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { // force close, no response + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); snapshotReceiverForceStop(pReceiver); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq"); syncNodeOnSnapshotTransfering(pSyncNode, pMsg); - } else { // error log - sRTrace(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); + sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); return -1; } - } else { // error log - sRTrace(pReceiver, "snapshot receiver term not equal"); + sRError(pReceiver, "snapshot receiver term not equal"); return -1; } } else { // error log - sRTrace(pReceiver, "snapshot receiver not follower"); + sRError(pReceiver, "snapshot receiver not follower"); return -1; } @@ -789,20 +883,26 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // get sender SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); - ASSERT(pSender != NULL); + if (pSender == NULL) { + sNError(pSyncNode, "prepare snapshot error since sender is null"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } - SSnapshot snapshot; + SSnapshot snapshot = {0}; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // prepare pSender->snapshotParam.start = pMsg->snapBeginIndex; pSender->snapshotParam.end = snapshot.lastApplyIndex; - sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64, - pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64, + pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) { - sNError(pSyncNode, "snapshot last index too small"); + sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex, + snapshot.lastApplyIndex); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } @@ -812,7 +912,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) // start reader int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader)); if (code != 0) { - sNError(pSyncNode, "create snapshot reader error"); + sSError(pSender, "prepare snapshot failed since %s", terrstr()); return -1; } @@ -824,7 +924,10 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) // build begin msg SRpcMsg rpcMsg = {0}; - (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); + if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "prepare snapshot failed since build msg error"); + return -1; + } SyncSnapshotSend *pSendMsg = rpcMsg.pCont; pSendMsg->srcId = pSender->pSyncNode->myRaftId; @@ -839,8 +942,11 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; // send msg - syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg); - syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, ""); + syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre"); + if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "prepare snapshot failed since send msg error"); + return -1; + } return 0; } @@ -851,7 +957,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) // condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 3 sender receives error msg, just print error log // -int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { +int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { SyncSnapshotRsp *pMsg = pRpcMsg->pCont; // if already drop replica, do not process @@ -861,36 +967,47 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } // get sender - SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - if (pMsg->startTime != pSender->startTime) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender/receiver start time not match"); + SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId); + if (pSender == NULL) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, ""); + if (pMsg->startTime != pSender->startTime) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match"); + return -1; + } // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // prepare , send begin msg if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); syncNodeOnSnapshotReplyPre(pSyncNode, pMsg); return 0; } if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { - snapshotSenderUpdateProgress(pSender, pMsg); - snapshotSend(pSender); + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); + if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { + return -1; + } + + if (snapshotSend(pSender) != 0) { + return -1; + } return 0; } // receive ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); snapshotSenderStop(pSender, true); SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); if (pMgr) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr"); syncLogReplMgrReset(pMgr); } return 0; @@ -898,12 +1015,18 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // send next msg if (pMsg->ack == pSender->seq) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq"); // update sender ack - snapshotSenderUpdateProgress(pSender, pMsg); - snapshotSend(pSender); + if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { + return -1; + } + if (snapshotSend(pSender) != 0) { + return -1; + } } else if (pMsg->ack == pSender->seq - 1) { // maybe resend + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); snapshotReSend(pSender); } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a034e6ad83..49a24bebde 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -277,14 +277,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo if (pNode != NULL && pNode->pRaftCfg != NULL) { taosPrintLog(flags, level, dflag, - "vgId:%d, sync %s " - "%s" - ", term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 - ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 + "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 + ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, + pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, @@ -330,13 +328,13 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla va_end(argpointer); taosPrintLog(flags, level, dflag, - "vgId:%d, sync %s " - "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 + "vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 + " lcindex:%" PRId64 " seq:%d ack:%d finish:%d replica-index:%d %s:%d}" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, pSender, pSender->snapshotParam.start, + pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, @@ -382,14 +380,14 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df va_end(argpointer); taosPrintLog(flags, level, dflag, - "vgId:%d, sync %s " - "%s {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64 + "vgId:%d, %s, sync:%s," + " {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 "}" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, pReceiver, pReceiver->start, pReceiver->ack, + pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, @@ -520,95 +518,56 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p port, pMsg->term, pMsg->timeStamp, s, timeDiff); } -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; - - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; - - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; - - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->snapStart, s); -} - -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; - - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, - port, pMsg->term, pMsg->snapStart, s); -} - void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; + if (!(sDebugFlag & DEBUG_DEBUG)) return; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); + sNDebug(pSyncNode, + "send sync-snapshot-send to %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 + ", lterm:%" PRId64 ", stime:%" PRId64, + host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; + if (!(sDebugFlag & DEBUG_DEBUG)) return; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d, len:%u}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, - pMsg->dataLen, s); + sNDebug(pSyncNode, + "recv sync-snapshot-send from %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 + ", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u", + host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, + pMsg->dataLen); } void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; - + if (!(sDebugFlag & DEBUG_DEBUG)) return; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); + sNDebug(pSyncNode, + "send sync-snapshot-rsp to %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 + ", lterm:%" PRId64 ", stime:%" PRId64, + host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - if (!(sDebugFlag & DEBUG_TRACE)) return; + if (!(sDebugFlag & DEBUG_DEBUG)) return; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); + sNDebug(pSyncNode, + "recv sync-snapshot-rsp from %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 + ", lterm:%" PRId64 ", stime:%" PRId64, + host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime); } void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c index 82e23b2885..f1237e5282 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -132,7 +132,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { } int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { - syncLogRecvSyncPreSnapshot(ths, pMsg, ""); + // syncLogRecvSyncPreSnapshot(ths, pMsg, ""); SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); pMsgReply->srcId = ths->myRaftId; @@ -181,7 +181,7 @@ _IGNORE: } int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { - syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); + // syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); // start snapshot