enh: refactor return code

This commit is contained in:
kailixu 2024-07-25 19:07:25 +08:00
parent ae2fa9e1ce
commit 73bb3cef13
7 changed files with 200 additions and 212 deletions

View File

@ -58,7 +58,7 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, S
int32_t syncSnapSendMsg(SSyncSnapshotSender* pSender, int32_t seq, void* pBlock, int32_t len, int32_t typ);
int32_t syncSnapSendRsp(SSyncSnapshotReceiver* pReceiver, SyncSnapshotSend* pMsg, void* pBlock, int32_t len,
int32_t typ, int32_t code);
int32_t typ, int32_t rspCode);
#ifdef __cplusplus
}

View File

@ -74,7 +74,7 @@ typedef struct SSyncSnapshotSender {
int32_t replicaIndex;
} SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
@ -101,7 +101,7 @@ typedef struct SSyncSnapshotReceiver {
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);

View File

@ -949,6 +949,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
int32_t code = 0;
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1229,7 +1230,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
// snapshot senders
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
SSyncSnapshotSender* pSender = NULL;
code = snapshotSenderCreate(pSyncNode, i, &pSender);
if (pSender == NULL) return NULL;
pSyncNode->senders[i] = pSender;
@ -1237,7 +1239,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver);
if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
pSyncNode->pNewNodeReceiver);
@ -1810,7 +1812,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
if (pSyncNode->senders[i] == NULL) {
pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]);
if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
@ -2840,8 +2842,9 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
}
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i);
if (pSender == NULL) return -1;
SSyncSnapshotSender* pSender = NULL;
int32_t code = snapshotSenderCreate(ths, i, &pSender);
if (pSender == NULL) return terrno = code;
ths->senders[i] = pSender;
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);

View File

@ -1134,12 +1134,9 @@ void syncNodeLogReplDestroy(SSyncNode* pNode) {
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
int32_t code = 0;
*ppBuf = NULL;
SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
if (pBuf == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exit);
}
pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
@ -1149,25 +1146,25 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
goto _err;
goto _exit;
}
if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
goto _err;
goto _exit;
}
if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to init log buffer mutex due to %s", tstrerror(code));
goto _err;
goto _exit;
}
_exit:
if (code != 0) {
taosMemoryFreeClear(pBuf);
}
*ppBuf = pBuf;
_err:
taosMemoryFree(pBuf);
TAOS_RETURN(code);
}

View File

@ -50,27 +50,31 @@ static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
return;
}
static SSyncSnapBuffer *syncSnapBufferCreate() {
static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
*ppBuf = NULL;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE);
taosThreadMutexInit(&pBuf->mutex, NULL);
return pBuf;
(void)taosThreadMutexInit(&pBuf->mutex, NULL);
*ppBuf = pBuf;
TAOS_RETURN(0);
}
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) {
int32_t code = 0;
*ppSender = NULL;
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
if (!condition) return NULL;
if (!condition) {
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pSender->start = false;
@ -85,17 +89,19 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate();
SSyncSnapBuffer *pSndBuf = NULL;
code = syncSnapBufferCreate(&pSndBuf);
if (pSndBuf == NULL) {
taosMemoryFree(pSender);
pSender = NULL;
return NULL;
TAOS_RETURN(code);
}
pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
pSender->pSndBuf = pSndBuf;
syncSnapBufferReset(pSender->pSndBuf);
return pSender;
*ppSender = pSender;
TAOS_RETURN(code);
}
void syncSnapBlockDestroy(void *ptr) {
@ -144,7 +150,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int32_t code = -1;
int32_t code = 0;
int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
if (started) return 0;
@ -160,7 +166,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
(void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0;
pSender->term = raftStoreGetTerm(pSender->pSyncNode);
pSender->startTime = taosGetMonoTimestampMs();
@ -170,8 +176,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
// Get snapshot info
SSyncNode *pSyncNode = pSender->pSyncNode;
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) {
sSError(pSender, "snapshot get info failure since %s", terrstr());
if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) {
sSError(pSender, "snapshot get info failure since %s", tstrerror(code));
goto _out;
}
@ -182,25 +188,24 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SSyncTLV *datHead = pData;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
terrno = TSDB_CODE_INVALID_DATA_FMT;
code = TSDB_CODE_INVALID_DATA_FMT;
goto _out;
}
dataLen = sizeof(SSyncTLV) + datHead->len;
}
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) {
goto _out;
}
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
code = 0;
_out:
if (snapInfo.data) {
taosMemoryFree(snapInfo.data);
snapInfo.data = NULL;
}
return code;
TAOS_RETURN(code);
}
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
@ -230,11 +235,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
int32_t code = -1;
int32_t code = 0;
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) {
sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code));
goto _OUT;
}
@ -256,20 +261,19 @@ int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock,
pMsg->payloadType = typ;
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) {
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq);
goto _OUT;
}
code = 0;
_OUT:
return code;
TAOS_RETURN(code);
}
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
int32_t code = -1;
int32_t code = 0;
SyncSnapBlock *pBlk = NULL;
if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
@ -278,7 +282,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
if (pBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
@ -288,7 +292,6 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
&pBlk->blockLen);
if (code != 0) {
terrno = code;
sSError(pSender, "snapshot sender read failed since %s", tstrerror(code));
goto _OUT;
}
@ -300,7 +303,6 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end");
code = 0;
goto _OUT;
}
}
@ -311,7 +313,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// send msg
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) {
goto _OUT;
}
@ -325,22 +327,22 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
}
pSender->lastSendTime = nowMs;
code = 0;
_OUT:;
if (pBlk != NULL) {
syncSnapBlockDestroy(pBlk);
pBlk = NULL;
}
return code;
TAOS_RETURN(code);
}
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
int32_t code = 0;
taosThreadMutexLock(&pSndBuf->mutex);
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
@ -351,7 +353,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) {
goto _out;
}
pBlk->sendTimeMs = nowMs;
@ -364,21 +366,20 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) {
goto _out;
}
}
code = 0;
_out:;
taosThreadMutexUnlock(&pSndBuf->mutex);
return code;
TAOS_RETURN(code);
}
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
sNError(pSyncNode, "snapshot sender start error since get failed");
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
if (snapshotSenderIsStart(pSender)) {
@ -390,23 +391,26 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
sSError(pSender, "snapshot sender start error since %s", terrstr());
return -1;
sSError(pSender, "snapshot sender start error since %s", tstrerror(code));
TAOS_RETURN(code);
}
return 0;
}
// receiver
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) {
int32_t code = 0;
*ppReceiver = NULL;
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
if (!condition) return NULL;
if (!condition) {
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
if (pReceiver == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pReceiver->start = false;
@ -421,17 +425,19 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate();
SSyncSnapBuffer *pRcvBuf = NULL;
code = syncSnapBufferCreate(&pRcvBuf);
if (pRcvBuf == NULL) {
taosMemoryFree(pReceiver);
pReceiver = NULL;
return NULL;
TAOS_RETURN(code);
}
pRcvBuf->entryDeleteCb = rpcFreeCont;
pReceiver->pRcvBuf = pRcvBuf;
syncSnapBufferReset(pReceiver->pRcvBuf);
return pReceiver;
*ppReceiver = pReceiver;
TAOS_RETURN(code);
}
static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
@ -452,10 +458,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
// close writer
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &pReceiver->snapshot);
if (code != 0) {
sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId,
tstrerror(code));
}
pReceiver->pWriter = NULL;
}
@ -486,8 +493,7 @@ static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, Sy
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
// update ack
@ -501,11 +507,11 @@ static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, Syn
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// start writer
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
&pReceiver->pWriter);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
return -1;
if (code != 0) {
sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
// event log
@ -541,10 +547,10 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
if (code != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code));
}
pReceiver->pWriter = NULL;
} else {
@ -567,8 +573,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
if (code != 0) {
sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
return -1;
sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code));
TAOS_RETURN(code);
}
}
@ -586,8 +592,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&pReceiver->snapshot);
if (code != 0) {
sRError(pReceiver, "snapshot receiver apply failed since %s", terrstr());
return -1;
sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
pReceiver->pWriter = NULL;
sRInfo(pReceiver, "snapshot receiver write stopped");
@ -604,13 +610,14 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) {
sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
return -1;
sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code));
TAOS_RETURN(code);
}
sRInfo(pReceiver, "wal log restored from snapshot");
} else {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1;
TAOS_RETURN(code);
}
return 0;
@ -619,14 +626,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pMsg->seq != pReceiver->ack + 1) {
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
}
if (pReceiver->pWriter == NULL) {
sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
@ -636,8 +641,8 @@ static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSna
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen);
if (code != 0) {
sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
return -1;
sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
}
@ -671,29 +676,27 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT);
int32_t code = 0;
int32_t code = 0, lino = 0;
// copy snap info from leader
void *data = taosMemoryCalloc(1, pMsg->dataLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _out;
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
pInfo->data = data;
data = NULL;
memcpy(pInfo->data, pMsg->data, pMsg->dataLen);
// exchange snap info
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo) != 0) {
if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) {
sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType);
goto _out;
goto _exit;
}
SSyncTLV *datHead = pInfo->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
code = TSDB_CODE_INVALID_DATA_FMT;
goto _out;
goto _exit;
}
int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
@ -701,18 +704,17 @@ static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshot
SSnapshotParam *pParam = &pReceiver->snapshotParam;
data = taosMemoryRealloc(pParam->data, dataLen);
if (data == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
strerror(errno), dataLen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _out;
tstrerror(code), dataLen);
goto _exit;
}
pParam->data = data;
data = NULL;
memcpy(pParam->data, pInfo->data, dataLen);
_out:
return code;
_exit:
TAOS_RETURN(code);
}
static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
@ -741,8 +743,7 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
"received a stale snapshot preparation. ignore."
" msg signature:(%" PRId64 ", %" PRId64 ")",
pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _SEND_REPLY;
}
} else {
@ -764,7 +765,7 @@ _SEND_REPLY:;
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
int32_t dataLen = 0;
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
if (syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) {
if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) {
goto _out;
}
SSyncTLV *datHead = snapInfo.data;
@ -773,8 +774,7 @@ _SEND_REPLY:;
// send response
int32_t type = (snapInfo.data) ? snapInfo.type : 0;
if (syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code) != 0) {
code = terrno;
if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) {
goto _out;
}
@ -783,7 +783,7 @@ _out:
taosMemoryFree(snapInfo.data);
snapInfo.data = NULL;
}
return code;
TAOS_RETURN(code);
}
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
@ -797,14 +797,14 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
}
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to begin snapshot receiver since %s", terrstr());
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to begin snapshot receiver since %s", tstrerror(code));
goto _SEND_REPLY;
}
// start writer
if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
sRError(pReceiver, "failed to start snapshot writer since %s", terrstr());
if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) {
sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code));
goto _SEND_REPLY;
}
@ -817,26 +817,22 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
code = 0;
_SEND_REPLY:
if (code != 0 && terrno != 0) {
code = terrno;
}
// send response
if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) {
return -1;
}
TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code));
return code;
TAOS_RETURN(code);
}
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
int32_t type, int32_t code) {
int32_t type, int32_t rspCode) {
int32_t code = 0;
SSyncNode *pSyncNode = pReceiver->pSyncNode;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) {
sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr());
return -1;
if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) {
sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code));
TAOS_RETURN(code);
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
@ -847,7 +843,7 @@ int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pMsg->seq;
pRspMsg->code = code;
pRspMsg->code = rspCode;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
pRspMsg->payloadType = type;
@ -856,9 +852,9 @@ int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg
}
// send msg
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr());
return -1;
if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code));
TAOS_RETURN(code);
}
return 0;
}
@ -867,13 +863,11 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
int32_t code = 0;
SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf;
SyncSnapshotSend *pMsg = ppMsg[0];
terrno = TSDB_CODE_SUCCESS;
taosThreadMutexLock(&pRcvBuf->mutex);
if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
terrno = TSDB_CODE_SYN_BUFFER_FULL;
code = terrno;
code = TSDB_CODE_SYN_BUFFER_FULL;
goto _out;
}
@ -887,7 +881,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else if (pMsg->seq < pRcvBuf->start) {
syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
goto _out;
}
@ -900,8 +894,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
}
for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) {
code = terrno;
if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) {
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
}
@ -915,7 +908,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
_out:
taosThreadMutexUnlock(&pRcvBuf->mutex);
return code;
TAOS_RETURN(code);
}
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
@ -928,9 +921,9 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
int32_t code = 0;
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno);
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to receive snapshot data since %s.", tstrerror(code));
return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
}
return syncSnapBufferRecv(pReceiver, ppMsg);
@ -946,8 +939,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _SEND_REPLY;
}
@ -960,9 +952,9 @@ _SEND_REPLY:;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
return -1;
if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) {
sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
@ -978,12 +970,12 @@ _SEND_REPLY:;
// send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
return -1;
if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) {
sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
return code;
TAOS_RETURN(code);
}
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
@ -995,35 +987,36 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
TAOS_RETURN(code);
}
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
pMsg->seq);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno);
return -1;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
TAOS_RETURN(code);
}
if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
syncNodeStepDown(pSyncNode, pMsg->term);
}
}
else{
} else {
syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
}
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
sRError(pReceiver, "snapshot receiver not a follower or learner");
return -1;
code = TSDB_CODE_SYN_INTERNAL_ERROR;
TAOS_RETURN(code);
}
if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
return -1;
code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
TAOS_RETURN(code);
}
// prepare
@ -1060,14 +1053,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
if (code != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code));
}
goto _out;
}
_out:;
syncNodeResetElectTimer(pSyncNode);
return code;
TAOS_RETURN(code);
}
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
@ -1076,16 +1069,14 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe
SSyncTLV *datHead = (void *)pMsg->data;
if (datHead->typ != pMsg->payloadType) {
sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
terrno = TSDB_CODE_INVALID_DATA_FMT;
return -1;
TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT);
}
int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
SSnapshotParam *pParam = &pSender->snapshotParam;
void *data = taosMemoryRealloc(pParam->data, dataLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(data, pMsg->data, dataLen);
@ -1097,19 +1088,18 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe
// sender
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
int32_t code = -1;
int32_t code = 0;
SSnapshot snapshot = {0};
if (pMsg->snapBeginIndex > pSyncNode->commitIndex) {
sSError(pSender,
"snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64,
pMsg->snapBeginIndex, pSyncNode->commitIndex);
terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG);
}
taosThreadMutexLock(&pSender->pSndBuf->mutex);
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out);
// prepare <begin, end>
pSender->snapshotParam.start = pMsg->snapBeginIndex;
@ -1123,14 +1113,12 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
// start reader
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) {
goto _out;
}
TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out);
}
code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
if (code != 0) {
sSError(pSender, "prepare snapshot failed since %s", terrstr());
sSError(pSender, "prepare snapshot failed since %s", tstrerror(code));
goto _out;
}
@ -1141,7 +1129,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
_out:
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
return code;
TAOS_RETURN(code);
}
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
@ -1159,17 +1147,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
taosThreadMutexLock(&pSndBuf->mutex);
if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _out;
}
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
code = terrno = TSDB_CODE_SYN_BUFFER_FULL;
code = TSDB_CODE_SYN_BUFFER_FULL;
goto _out;
}
@ -1209,49 +1197,47 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
}
_out:
taosThreadMutexUnlock(&pSndBuf->mutex);
return code;
TAOS_RETURN(code);
}
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
SyncSnapshotRsp *pMsg = ppMsg[0];
int32_t code = 0;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
}
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
if (pSender == NULL) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
if (!snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
// check signature
int32_t order = 0;
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE);
} else if (order < 0) {
sSError(pSender, "snapshot sender is stale. stop");
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR;
}
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) {
sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
code = TSDB_CODE_SYN_NOT_LEADER;
goto _ERROR;
}
@ -1259,29 +1245,29 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
if (pMsg->term != currentTerm) {
sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
currentTerm);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR;
}
if (pMsg->code != 0) {
sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
terrno = pMsg->code;
code = pMsg->code;
goto _ERROR;
}
// send begin
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
sSInfo(pSender, "process prepare rsp");
if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) {
if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) {
goto _ERROR;
}
}
// send msg of data or end
if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq,
pSender->pReader, pSender->finish);
if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code),
pSender->seq, pSender->pReader, pSender->finish);
goto _ERROR;
}
}
@ -1298,5 +1284,5 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
_ERROR:
snapshotSenderStop(pSender, false);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1;
TAOS_RETURN(code);
}

View File

@ -42,7 +42,8 @@ SSyncSnapshotReceiver* createReceiver() {
id.addr = syncUtilAddr2U64("1.2.3.4", 99);
id.vgId = 100;
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, id);
SSyncSnapshotReceiver* pReceiver = NULL;
(void)snapshotReceiverCreate(pSyncNode, id, &pReceiver);
pReceiver->start = true;
pReceiver->ack = 20;
pReceiver->pWriter = (void*)0x11;

View File

@ -39,7 +39,8 @@ SSyncSnapshotSender* createSender() {
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot;
#endif
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2);
SSyncSnapshotSender* pSender = NULL;
(void)snapshotSenderCreate(pSyncNode, 2, &pSender);
pSender->start = true;
pSender->seq = 10;
pSender->ack = 20;