Merge pull request #19103 from taosdata/fix/TD-21446
fix: restart snapshot sender on receiver is restart
This commit is contained in:
commit
43561bfb8c
|
@ -517,6 +517,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
|
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
|
||||||
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
||||||
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
|
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
|
||||||
|
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
|
||||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
|
|
|
@ -234,10 +234,11 @@ int vnodeAsyncCommit(SVnode *pVnode) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
|
vError("vgId:%d, vnode async commit failed since %s, commitId:%" PRId64, TD_VID(pVnode), tstrerror(code),
|
||||||
pVnode->state.commitID);
|
pVnode->state.commitID);
|
||||||
} else {
|
} else {
|
||||||
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
|
vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
|
||||||
|
pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -256,7 +257,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
||||||
char dir[TSDB_FILENAME_LEN] = {0};
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
SVnode *pVnode = pInfo->pVnode;
|
SVnode *pVnode = pInfo->pVnode;
|
||||||
|
|
||||||
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
|
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
|
||||||
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
|
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
|
||||||
|
|
||||||
// persist wal before starting
|
// persist wal before starting
|
||||||
|
|
|
@ -423,8 +423,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
ASSERT(pHdr->index == pWriter->index + 1);
|
ASSERT(pHdr->index == pWriter->index + 1);
|
||||||
pWriter->index = pHdr->index;
|
pWriter->index = pHdr->index;
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
|
vDebug("vgId:%d, vnode snapshot write data, index:%" PRId64 " type:%d blockLen:%d", TD_VID(pVnode), pHdr->index,
|
||||||
pHdr->type, nData);
|
pHdr->type, nData);
|
||||||
|
|
||||||
switch (pHdr->type) {
|
switch (pHdr->type) {
|
||||||
case SNAP_DATA_CFG: {
|
case SNAP_DATA_CFG: {
|
||||||
|
|
|
@ -465,9 +465,9 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
|
||||||
|
|
||||||
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
|
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
|
vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
|
||||||
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
|
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
|
||||||
vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
|
vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
|
||||||
return prevLogTerm;
|
return prevLogTerm;
|
||||||
}
|
}
|
||||||
|
|
||||||
sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex);
|
sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,8 +115,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
|
||||||
sNError(pData->pSyncNode,
|
sNError(pData->pSyncNode,
|
||||||
"wal restore from snapshot error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
"wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
|
||||||
snapshotIndex, err, err, errStr, sysErr, sysErrStr);
|
err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,8 +212,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
|
||||||
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
pEntry->index, err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,11 +257,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
|
|
||||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
|
sNTrace(pData->pSyncNode, "wal read not exist, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index,
|
||||||
err, err, errStr, sysErr, sysErrStr);
|
err, errStr, sysErr, sysErrStr);
|
||||||
} else {
|
} else {
|
||||||
sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
|
sNTrace(pData->pSyncNode, "wal read error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", index, err,
|
||||||
err, err, errStr, sysErr, sysErrStr);
|
errStr, sysErr, sysErrStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -341,8 +341,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);
|
pData->pSyncNode->vgId, fromIndex, err, errStr, sysErr, sysErrStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
|
@ -392,8 +392,8 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);
|
pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -49,7 +49,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
|
|
||||||
sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId);
|
|
||||||
return pSender;
|
return pSender;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,7 +293,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (snapshotSenderIsStart(pSender)) {
|
if (snapshotSenderIsStart(pSender)) {
|
||||||
sSError(pSender, "snapshot sender already start, ignore");
|
sSInfo(pSender, "snapshot sender already start, ignore");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,7 +522,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
||||||
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
if (pMsg->seq != pReceiver->ack + 1) {
|
if (pMsg->seq != pReceiver->ack + 1) {
|
||||||
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
|
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -721,8 +720,12 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
|
||||||
timeNow = taosGetTimestampMs();
|
timeNow = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
|
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
|
||||||
return -1;
|
code = terrno;
|
||||||
|
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// build msg
|
// build msg
|
||||||
|
@ -740,11 +743,11 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
|
||||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||||
pRspMsg->startTime = pReceiver->startTime;
|
pRspMsg->startTime = pReceiver->startTime;
|
||||||
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
||||||
pRspMsg->code = 0;
|
pRspMsg->code = code;
|
||||||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving");
|
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
|
||||||
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
|
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
|
||||||
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
|
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -861,7 +864,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
|
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
|
||||||
snapshotReceiverForceStop(pReceiver);
|
snapshotReceiverForceStop(pReceiver);
|
||||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq");
|
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
|
||||||
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
|
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
|
||||||
} else {
|
} else {
|
||||||
// error log
|
// error log
|
||||||
|
@ -976,76 +979,102 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->startTime != pSender->startTime) {
|
// state, term, seq/ack
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match");
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
return -1;
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
|
||||||
|
sSError(pSender, "snapshot sender not leader");
|
||||||
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// state, term, seq/ack
|
if (pMsg->startTime != pSender->startTime) {
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
|
||||||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
|
||||||
// prepare <begin, end>, send begin msg
|
pSender->startTime, pMsg->code);
|
||||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
|
goto _ERROR;
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
|
}
|
||||||
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
if (pMsg->term != pSyncNode->pRaftStore->currentTerm) {
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
|
||||||
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
|
||||||
return -1;
|
pSyncNode->pRaftStore->currentTerm);
|
||||||
}
|
goto _ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
if (snapshotSend(pSender) != 0) {
|
if (pMsg->code != 0) {
|
||||||
return -1;
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
|
||||||
}
|
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
|
||||||
return 0;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// receive ack is finish, close sender
|
// prepare <begin, end>, send begin msg
|
||||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
|
||||||
snapshotSenderStop(pSender, true);
|
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
|
||||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
return 0;
|
||||||
if (pMgr) {
|
}
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
|
|
||||||
syncLogReplMgrReset(pMgr);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// send next msg
|
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||||
if (pMsg->ack == pSender->seq) {
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq");
|
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
||||||
// update sender ack
|
|
||||||
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (snapshotSend(pSender) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pMsg->ack == pSender->seq - 1) {
|
|
||||||
// maybe resend
|
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
|
|
||||||
snapshotReSend(pSender);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// error log
|
|
||||||
sSError(pSender, "snapshot sender recv error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// error log
|
|
||||||
sSError(pSender, "snapshot sender term not equal");
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (snapshotSend(pSender) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive ack is finish, close sender
|
||||||
|
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
|
||||||
|
snapshotSenderStop(pSender, true);
|
||||||
|
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||||
|
if (pMgr) {
|
||||||
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
|
||||||
|
syncLogReplMgrReset(pMgr);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// send next msg
|
||||||
|
if (pMsg->ack == pSender->seq) {
|
||||||
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
|
||||||
|
// update sender ack
|
||||||
|
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (snapshotSend(pSender) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (pMsg->ack == pSender->seq - 1) {
|
||||||
|
// maybe resend
|
||||||
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
|
||||||
|
snapshotReSend(pSender);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// error log
|
// error log
|
||||||
sSError(pSender, "snapshot sender not leader");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
|
||||||
|
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
||||||
|
snapshotSenderStop(pSender, true);
|
||||||
|
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||||
|
if (pMgr) {
|
||||||
|
syncLogReplMgrReset(pMgr);
|
||||||
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_ERROR:
|
||||||
|
snapshotSenderStop(pSender, true);
|
||||||
|
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||||
|
if (pMgr) {
|
||||||
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
|
||||||
|
syncLogReplMgrReset(pMgr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -405,6 +405,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for pr
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||||
|
|
||||||
//tq
|
//tq
|
||||||
|
|
|
@ -134,6 +134,7 @@ echo "mDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "wDebugFlag 143" >> $TAOS_CFG
|
echo "wDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "sDebugFlag 143" >> $TAOS_CFG
|
echo "sDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "tsdbDebugFlag 143" >> $TAOS_CFG
|
echo "tsdbDebugFlag 143" >> $TAOS_CFG
|
||||||
|
echo "tdbDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "tqDebugFlag 143" >> $TAOS_CFG
|
echo "tqDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "fsDebugFlag 143" >> $TAOS_CFG
|
echo "fsDebugFlag 143" >> $TAOS_CFG
|
||||||
echo "idxDebugFlag 143" >> $TAOS_CFG
|
echo "idxDebugFlag 143" >> $TAOS_CFG
|
||||||
|
|
|
@ -10,13 +10,11 @@ set +e
|
||||||
#set -x
|
#set -x
|
||||||
|
|
||||||
FILE_NAME=
|
FILE_NAME=
|
||||||
RELEASE=0
|
|
||||||
ASYNC=0
|
|
||||||
VALGRIND=0
|
VALGRIND=0
|
||||||
UNIQUE=0
|
TEST=0
|
||||||
UNAME_BIN=`which uname`
|
UNAME_BIN=`which uname`
|
||||||
OS_TYPE=`$UNAME_BIN`
|
OS_TYPE=`$UNAME_BIN`
|
||||||
while getopts "f:agvum" arg
|
while getopts "f:tgv" arg
|
||||||
do
|
do
|
||||||
case $arg in
|
case $arg in
|
||||||
f)
|
f)
|
||||||
|
@ -25,8 +23,8 @@ do
|
||||||
v)
|
v)
|
||||||
VALGRIND=1
|
VALGRIND=1
|
||||||
;;
|
;;
|
||||||
u)
|
t)
|
||||||
UNIQUE=1
|
TEST=1
|
||||||
;;
|
;;
|
||||||
g)
|
g)
|
||||||
VALGRIND=2
|
VALGRIND=2
|
||||||
|
@ -140,6 +138,11 @@ if [ -n "$FILE_NAME" ]; then
|
||||||
result=$?
|
result=$?
|
||||||
echo "Execute result:" $result
|
echo "Execute result:" $result
|
||||||
|
|
||||||
|
if [ $TEST -eq 1 ]; then
|
||||||
|
echo "Exit without check asan errors"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
if [ $result -eq 0 ]; then
|
if [ $result -eq 0 ]; then
|
||||||
$CODE_DIR/sh/sigint_stop_dnodes.sh
|
$CODE_DIR/sh/sigint_stop_dnodes.sh
|
||||||
$CODE_DIR/sh/checkAsan.sh
|
$CODE_DIR/sh/checkAsan.sh
|
||||||
|
|
Loading…
Reference in New Issue