refact: update sync log

This commit is contained in:
Shengliang Guan 2022-12-20 21:30:46 +08:00
parent 5d49e4f555
commit d674c8370b
8 changed files with 392 additions and 316 deletions

View File

@ -227,7 +227,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);

View File

@ -86,7 +86,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
// on message // on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg); int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);

View File

@ -100,12 +100,6 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);

View File

@ -194,7 +194,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnSnapshot(pSyncNode, pMsg); code = syncNodeOnSnapshot(pSyncNode, pMsg);
break; break;
case TDMT_SYNC_SNAPSHOT_RSP: case TDMT_SYNC_SNAPSHOT_RSP:
code = syncNodeOnSnapshotReply(pSyncNode, pMsg); code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
break; break;
case TDMT_SYNC_LOCAL_CMD: case TDMT_SYNC_LOCAL_CMD:
code = syncNodeOnLocalCmd(pSyncNode, pMsg); code = syncNodeOnLocalCmd(pSyncNode, pMsg);
@ -705,7 +705,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
return -1; return -1;
} }
@ -890,10 +890,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init by SSyncInfo // init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncInfo->syncCfg; SSyncCfg* pCfg = &pSyncInfo->syncCfg;
sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) { for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i]; SNodeInfo* pNode = &pCfg->nodeInfo[i];
sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort); sInfo("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
} }
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
@ -1086,7 +1086,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL); // ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender; (pSyncNode->senders)[i] = pSender;
sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender); sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
} }
// snapshot receivers // snapshot receivers

View File

@ -36,14 +36,15 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
taosThreadMutexInit(&(pObj->mutex), NULL); taosThreadMutexInit(&(pObj->mutex), NULL);
SSyncNode *pNode = pObj->data; SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, create resp manager", pNode->vgId); sDebug("vgId:%d, resp manager create", pNode->vgId);
return pObj; return pObj;
} }
void syncRespMgrDestroy(SSyncRespMgr *pObj) { void syncRespMgrDestroy(SSyncRespMgr *pObj) {
if (pObj != NULL) { if (pObj == NULL) return;
SSyncNode *pNode = pObj->data; SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, destroy resp manager", pNode->vgId); sDebug("vgId:%d, resp manager destroy", pNode->vgId);
taosThreadMutexLock(&pObj->mutex); taosThreadMutexLock(&pObj->mutex);
taosHashCleanup(pObj->pRespHash); taosHashCleanup(pObj->pRespHash);
@ -51,7 +52,6 @@ void syncRespMgrDestroy(SSyncRespMgr *pObj) {
taosThreadMutexDestroy(&(pObj->mutex)); taosThreadMutexDestroy(&(pObj->mutex));
taosMemoryFree(pObj); taosMemoryFree(pObj);
} }
}
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) { uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
taosThreadMutexLock(&pObj->mutex); taosThreadMutexLock(&pObj->mutex);
@ -174,7 +174,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
void syncRespCleanRsp(SSyncRespMgr *pObj) { void syncRespCleanRsp(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data; SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean all rsp", pNode->vgId); sTrace("vgId:%d, clean all resp", pNode->vgId);
taosThreadMutexLock(&pObj->mutex); taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, -1, true); syncRespCleanByTTL(pObj, -1, true);
@ -183,7 +183,7 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) {
void syncRespClean(SSyncRespMgr *pObj) { void syncRespClean(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data; SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean rsp by ttl", pNode->vgId); sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
taosThreadMutexLock(&pObj->mutex); taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, pObj->ttl, false); syncRespCleanByTTL(pObj, pObj->ttl, false);

View File

@ -26,10 +26,9 @@
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL); (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
if (!condition) return NULL;
SSyncSnapshotSender *pSender = NULL; SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (condition) {
pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (pSender == NULL) { if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -49,15 +48,15 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->endTime = 0; pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false; pSender->finish = false;
} else {
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
}
sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId);
return pSender; return pSender;
} }
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender != NULL) { if (pSender == NULL) return;
sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId);
// free current block // free current block
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
@ -73,12 +72,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// free sender // free sender
taosMemoryFree(pSender); taosMemoryFree(pSender);
} }
}
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
ASSERT(!snapshotSenderIsStart(pSender)); if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "vgId:%d, snapshot sender is already start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->start = true; pSender->start = true;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
@ -86,10 +88,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->pReader = NULL; pSender->pReader = NULL;
pSender->pCurrentBlock = NULL; pSender->pCurrentBlock = NULL;
pSender->blockLen = 0; pSender->blockLen = 0;
pSender->snapshotParam.start = SYNC_INDEX_INVALID; pSender->snapshotParam.start = SYNC_INDEX_INVALID;
pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.data = NULL; pSender->snapshot.data = NULL;
pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
// build begin msg // build begin msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
@ -120,15 +123,20 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// send msg // send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
// event log // event log
sSTrace(pSender, "snapshot sender start"); sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
return 0; return 0;
} }
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
// update flag // update flag
pSender->start = false; pSender->start = false;
pSender->finish = finish; pSender->finish = finish;
@ -147,8 +155,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->blockLen = 0; pSender->blockLen = 0;
} }
// event log
sSTrace(pSender, "snapshot sender stop");
return 0; return 0;
} }
@ -164,18 +170,27 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// read data // read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&(pSender->pCurrentBlock), &(pSender->blockLen)); &pSender->pCurrentBlock, &pSender->blockLen);
ASSERT(ret == 0); if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr());
return -1;
}
if (pSender->blockLen > 0) { if (pSender->blockLen > 0) {
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
// has read data // has read data
} else { } else {
// read finish, update seq to end // read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END; pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
} }
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
@ -187,7 +202,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq;
// pMsg->privateTerm = pSender->privateTerm; // pMsg->privateTerm = pSender->privateTerm;
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
@ -195,27 +209,32 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
} }
// send msg // send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs(); pSender->lastSendTime = taosGetTimestampMs();
// event log // event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSTrace(pSender, "snapshot sender finish"); sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else { } else {
sSTrace(pSender, "snapshot sender sending"); sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
} }
return 0; return 0;
} }
// send snapshot data from cache // send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) { int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
// send current block data
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
@ -234,56 +253,63 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
} }
// send msg // send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs(); pSender->lastSendTime = taosGetTimestampMs();
// event log // event log
sSTrace(pSender, "snapshot sender resend"); sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
return 0; return 0;
} }
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq); if (pMsg->ack != pSender->seq) {
sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->ack = pMsg->ack; pSender->ack = pMsg->ack;
++(pSender->seq); pSender->seq++;
sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
return 0;
} }
// return 0, start ok // return 0, start ok
// return 1, last snapshot finish ok // return 1, last snapshot finish ok
// return -1, error // return -1, error
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNTrace(pSyncNode, "starting snapshot ..."); sNInfo(pSyncNode, "snapshot sender starting ...");
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) { if (pSender == NULL) {
sNError(pSyncNode, "start snapshot error, sender is null"); sNError(pSyncNode, "snapshot sender start error since get failed");
return -1; return -1;
} }
int32_t code = 0;
if (snapshotSenderIsStart(pSender)) { if (snapshotSenderIsStart(pSender)) {
sNTrace(pSyncNode, "snapshot sender already start, ignore"); sSError(pSender, "snapshot sender already start, ignore");
return 0; return 0;
} }
if (!snapshotSenderIsStart(pSender) && pSender->finish && if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { sSInfo(pSender, "snapshot sender start too frequently, ignore");
sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
return 1; return 1;
} }
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port); sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port);
code = snapshotSenderStart(pSender); int32_t code = snapshotSenderStart(pSender);
if (code != 0) { if (code != 0) {
sNError(pSyncNode, "snapshot sender start error"); sSError(pSender, "snapshot sender start error since %s", terrstr());
return -1; return -1;
} }
@ -293,10 +319,9 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL); (pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
if (!condition) return NULL;
SSyncSnapshotReceiver *pReceiver = NULL; SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
if (condition) {
pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
if (pReceiver == NULL) { if (pReceiver == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -313,50 +338,60 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
} else { sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId);
sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
}
return pReceiver; return pReceiver;
} }
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver != NULL) { if (pReceiver == NULL) return;
sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId);
// close writer // close writer
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
false, &(pReceiver->snapshot)); &pReceiver->snapshot);
ASSERT(ret == 0); if (ret != 0) {
sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
}
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
// free receiver // free receiver
taosMemoryFree(pReceiver); taosMemoryFree(pReceiver);
} }
}
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// force stop // force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");
// force close, abandon incomplete data // force close, abandon incomplete data
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
// event log // event log
sRTrace(pReceiver, "snapshot receiver force stop");
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot)); &pReceiver->snapshot);
ASSERT(ret == 0); if (ret != 0) {
sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
}
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
pReceiver->start = false; pReceiver->start = false;
// event log
// sRTrace(pReceiver, "snapshot receiver force stop");
} }
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
ASSERT(snapshotReceiverIsStart(pReceiver)); if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver is not start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
// update ack // update ack
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
@ -365,25 +400,25 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
pReceiver->snapshotParam.start = pBeginMsg->beginIndex; pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex; pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// start writer // start writer
ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->pWriter);
&(pReceiver->snapshotParam), &(pReceiver->pWriter)); if (ret != 0) {
ASSERT(ret == 0); sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
return -1;
}
// event log // event log
sRTrace(pReceiver, "snapshot receiver start writer"); sRInfo(pReceiver, "snapshot receiver start write");
return 0; return 0;
} }
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) { int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
if (snapshotReceiverIsStart(pReceiver)) { if (snapshotReceiverIsStart(pReceiver)) {
sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId); sRInfo(pReceiver, "snapshot receiver has started");
return 0; return 0;
} }
@ -394,49 +429,57 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
// event log // event log
sRTrace(pReceiver, "snapshot receiver start"); sRInfo(pReceiver, "snapshot receiver is start");
return 0; return 0;
} }
// just set start = false // just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL // FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot)); &pReceiver->snapshot);
ASSERT(ret == 0); if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
}
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
} }
pReceiver->start = false; pReceiver->start = false;
// event log
sRTrace(pReceiver, "snapshot receiver stop");
return 0; return 0;
} }
// when recv last snapshot block, apply data into snapshot // when recv last snapshot block, apply data into snapshot
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
int32_t code = 0; int32_t code = 0;
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
// write data // write data
sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
if (pMsg->dataLen > 0) { if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen); pMsg->dataLen);
if (code != 0) { if (code != 0) {
sNError(pReceiver->pSyncNode, "snapshot write error"); sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
return -1; return -1;
} }
} }
// reset wal // reset wal
sRInfo(pReceiver, "snapshot receiver log restore");
code = code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) { if (code != 0) {
sNError(pReceiver->pSyncNode, "wal restore from snapshot error"); sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
return -1; return -1;
} }
@ -452,10 +495,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
// stop writer, apply data // stop writer, apply data
sRInfo(pReceiver, "snapshot receiver apply write");
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot)); &pReceiver->snapshot);
if (code != 0) { if (code != 0) {
sNError(pReceiver->pSyncNode, "snapshot stop writer true error"); sRError(pReceiver, "snapshot receiver apply failed since %s", terrstr());
return -1; return -1;
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
@ -464,34 +508,48 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
} else { } else {
sNError(pReceiver->pSyncNode, "snapshot stop writer true error"); sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1; return -1;
} }
// event log // event log
sRTrace(pReceiver, "snapshot receiver got last data, finish, apply snapshot"); sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
return 0; return 0;
} }
// apply data block // apply data block
// update progress // update progress
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1); if (pMsg->seq != pReceiver->ack + 1) {
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pReceiver->pWriter == NULL) {
sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) { if (pMsg->dataLen > 0) {
// apply data block // apply data block
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen); pMsg->data, pMsg->dataLen);
ASSERT(code == 0); if (code != 0) {
sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
return -1;
}
} }
// update progress // update progress
pReceiver->ack = pMsg->seq; pReceiver->ack = pMsg->seq;
// event log // event log
sRTrace(pReceiver, "snapshot receiver receiving"); sRDebug(pReceiver, "snapshot receiver continue to write finish");
} return 0;
} }
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
@ -499,7 +557,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
if (syncNodeIsMnode(ths)) { if (syncNodeIsMnode(ths)) {
snapStart = SYNC_INDEX_BEGIN; snapStart = SYNC_INDEX_BEGIN;
sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
} else { } else {
SSyncLogStoreData *pData = ths->pLogStore->data; SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal; SWal *pWal = pData->pWal;
@ -514,6 +572,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
} else { } else {
snapStart = ths->commitIndex + 1; snapStart = ths->commitIndex + 1;
} }
sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
} }
return snapStart; return snapStart;
@ -521,41 +581,48 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
if (snapshotReceiverIsStart(pReceiver)) { if (snapshotReceiverIsStart(pReceiver)) {
// already start // already start
if (pMsg->startTime > pReceiver->startTime) { if (pMsg->startTime > pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
pReceiver->startTime, pMsg->startTime);
goto _START_RECEIVER; goto _START_RECEIVER;
} else if (pMsg->startTime == pReceiver->startTime) { } else if (pMsg->startTime == pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
pReceiver->startTime, pMsg->startTime);
goto _SEND_REPLY; goto _SEND_REPLY;
} else { } else {
// ignore // ignore
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
return 0; return 0;
} }
} else { } else {
// start new // start new
sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
goto _START_RECEIVER; goto _START_RECEIVER;
} }
_START_RECEIVER: _START_RECEIVER:
if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sNError(pSyncNode, "snapshot receiver time skew too much"); sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
pMsg->startTime);
return -1; return -1;
} else { } else {
// waiting for clock match // waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) { while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime); pMsg->startTime);
taosMsleep(10); taosMsleep(10);
timeNow = taosGetTimestampMs(); timeNow = taosGetTimestampMs();
} }
if (snapshotReceiverIsStart(pReceiver)) { if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverForceStop(pReceiver); snapshotReceiverForceStop(pReceiver);
} }
@ -567,7 +634,10 @@ _SEND_REPLY:
; // make complier happy ; // make complier happy
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
@ -581,8 +651,12 @@ _SEND_REPLY:
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
// send msg // send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1;
}
return 0; return 0;
} }
@ -591,12 +665,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
if (!snapshotReceiverIsStart(pReceiver)) { if (!snapshotReceiverIsStart(pReceiver)) {
sNError(pSyncNode, "snapshot receiver not start"); sRError(pReceiver, "snapshot receiver not start");
return -1; return -1;
} }
if (pReceiver->startTime != pMsg->startTime) { if (pReceiver->startTime != pMsg->startTime) {
sNError(pSyncNode, "snapshot receiver time not equal"); sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
return -1; return -1;
} }
@ -605,7 +680,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
@ -619,8 +697,12 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg // send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1;
}
return 0; return 0;
} }
@ -632,18 +714,22 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
// waiting for clock match // waiting for clock match
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) { while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime); pMsg->startTime);
taosMsleep(10); taosMsleep(10);
timeNow = taosGetTimestampMs();
} }
if (pMsg->seq == pReceiver->ack + 1) { if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
snapshotReceiverGotData(pReceiver, pMsg); return -1;
} }
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
@ -657,8 +743,11 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg // send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1;
}
return 0; return 0;
} }
@ -670,7 +759,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// waiting for clock match // waiting for clock match
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) { while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime); pMsg->startTime);
taosMsleep(10); taosMsleep(10);
timeNow = taosGetTimestampMs(); timeNow = taosGetTimestampMs();
@ -683,7 +772,10 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId); if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->srcId = pSyncNode->myRaftId;
@ -697,8 +789,12 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg // send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
return -1;
}
return 0; return 0;
} }
@ -724,15 +820,16 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// //
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotSend *pMsg = pRpcMsg->pCont; SyncSnapshotSend *pMsg = pRpcMsg->pCont;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
return 0; return 0;
} }
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) { if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject, small term"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
return 0; return 0;
} }
@ -741,45 +838,42 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} }
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
int32_t code = 0;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "");
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotPre(pSyncNode, pMsg); syncNodeOnSnapshotPre(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
syncNodeOnSnapshotBegin(pSyncNode, pMsg); syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
syncNodeOnSnapshotEnd(pSyncNode, pMsg); syncNodeOnSnapshotEnd(pSyncNode, pMsg);
(void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
return -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response // force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverForceStop(pReceiver); snapshotReceiverForceStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq");
syncNodeOnSnapshotTransfering(pSyncNode, pMsg); syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
} else { } else {
// error log // error log
sRTrace(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
return -1;
}
} else {
// error log
sRTrace(pReceiver, "snapshot receiver term not equal");
return -1; return -1;
} }
} else { } else {
// error log // error log
sRTrace(pReceiver, "snapshot receiver not follower"); sRError(pReceiver, "snapshot receiver term not equal");
return -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver not follower");
return -1; return -1;
} }
@ -789,20 +883,26 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender // get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
ASSERT(pSender != NULL); if (pSender == NULL) {
sNError(pSyncNode, "prepare snapshot error since sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SSnapshot snapshot; SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
// prepare <begin, end> // prepare <begin, end>
pSender->snapshotParam.start = pMsg->snapBeginIndex; pSender->snapshotParam.start = pMsg->snapBeginIndex;
pSender->snapshotParam.end = snapshot.lastApplyIndex; pSender->snapshotParam.end = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64, sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) { if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
sNError(pSyncNode, "snapshot last index too small"); sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
snapshot.lastApplyIndex);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
@ -812,7 +912,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// start reader // start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader)); int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
if (code != 0) { if (code != 0) {
sNError(pSyncNode, "create snapshot reader error"); sSError(pSender, "prepare snapshot failed since %s", terrstr());
return -1; return -1;
} }
@ -824,7 +924,10 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// build begin msg // build begin msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "prepare snapshot failed since build msg error");
return -1;
}
SyncSnapshotSend *pSendMsg = rpcMsg.pCont; SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->srcId = pSender->pSyncNode->myRaftId;
@ -839,8 +942,11 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// send msg // send msg
syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, ""); if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "prepare snapshot failed since send msg error");
return -1;
}
return 0; return 0;
} }
@ -851,7 +957,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log // condition 3 sender receives error msg, just print error log
// //
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotRsp *pMsg = pRpcMsg->pCont; SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
@ -861,36 +967,47 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} }
// get sender // get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
ASSERT(pSender != NULL); if (pSender == NULL) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
if (pMsg->startTime != pSender->startTime) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender/receiver start time not match");
return -1; return -1;
} }
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, ""); if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match");
return -1;
}
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// prepare <begin, end>, send begin msg // prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg); syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
return 0; return 0;
} }
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
snapshotSenderUpdateProgress(pSender, pMsg); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
snapshotSend(pSender); if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1;
}
if (snapshotSend(pSender) != 0) {
return -1;
}
return 0; return 0;
} }
// receive ack is finish, close sender // receive ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) { if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr); syncLogReplMgrReset(pMgr);
} }
return 0; return 0;
@ -898,12 +1015,18 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// send next msg // send next msg
if (pMsg->ack == pSender->seq) { if (pMsg->ack == pSender->seq) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq");
// update sender ack // update sender ack
snapshotSenderUpdateProgress(pSender, pMsg); if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
snapshotSend(pSender); return -1;
}
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) { } else if (pMsg->ack == pSender->seq - 1) {
// maybe resend // maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
snapshotReSend(pSender); snapshotReSend(pSender);
} else { } else {

View File

@ -277,14 +277,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
if (pNode != NULL && pNode->pRaftCfg != NULL) { if (pNode != NULL && pNode->pRaftCfg != NULL) {
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s " "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
"%s" ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64
", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, " ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum,
pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
@ -330,13 +328,13 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end(argpointer); va_end(argpointer);
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s " "vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64
"%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 " lcindex:%" PRId64
" seq:%d ack:%d finish:%d replica-index:%d %s:%d}" " seq:%d ack:%d finish:%d replica-index:%d %s:%d}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pSender, pSender->snapshotParam.start, pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
@ -382,14 +380,14 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
va_end(argpointer); va_end(argpointer);
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s " "vgId:%d, %s, sync:%s,"
"%s {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64 " {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64
" e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
"}" "}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pReceiver, pReceiver->start, pReceiver->ack, pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start, pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start,
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex,
@ -520,95 +518,56 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
port, pMsg->term, pMsg->timeStamp, s, timeDiff); port, pMsg->term, pMsg->timeStamp, s, timeDiff);
} }
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNDebug(pSyncNode,
"send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 "send sync-snapshot-send to %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s", ", lterm:%" PRId64 ", stime:%" PRId64,
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
} }
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNDebug(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 "recv sync-snapshot-send from %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s", ", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime,
pMsg->dataLen, s); pMsg->dataLen);
} }
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNDebug(pSyncNode,
"send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 "send sync-snapshot-rsp to %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s", ", lterm:%" PRId64 ", stime:%" PRId64,
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
} }
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNDebug(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 "recv sync-snapshot-rsp from %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s", ", lterm:%" PRId64 ", stime:%" PRId64,
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
} }
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {

View File

@ -132,7 +132,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
} }
int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
syncLogRecvSyncPreSnapshot(ths, pMsg, ""); // syncLogRecvSyncPreSnapshot(ths, pMsg, "");
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId; pMsgReply->srcId = ths->myRaftId;
@ -181,7 +181,7 @@ _IGNORE:
} }
int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) {
syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); // syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
// start snapshot // start snapshot