Merge branch main to FIX/TD-21043-main

This commit is contained in:
Benguang Zhao 2022-12-27 14:50:39 +08:00
commit 8916722dd7
25 changed files with 237 additions and 209 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG f0c1753
GIT_TAG 5662a6d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -268,7 +268,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define NEED_REDIRECT_ERROR(_code) \
(NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \

View File

@ -65,6 +65,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) //
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) //
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //
@ -518,6 +520,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) //
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq

View File

@ -348,7 +348,8 @@ cd ${release_dir}
# install_dir has been distinguishes cluster from edege, so comments this code
pkg_name=${install_dir}-${osType}-${cpuType}
taostools_pkg_name=${taostools_install_dir}-${osType}-${cpuType}
versionCompFirst=$(echo ${versionComp} | awk -F '.' '{print $1}')
taostools_pkg_name=${taostools_install_dir}-${osType}-${cpuType}-comp${versionCompFirst}
# if [ "$verMode" == "cluster" ]; then
# pkg_name=${install_dir}-${osType}-${cpuType}

View File

@ -1430,6 +1430,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
}
// pMsg is response msg
if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
// restore origin code
if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
}
} else {
// uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
}
}
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
arg->msg = *pMsg;
arg->pEpset = tEpSet;

View File

@ -233,6 +233,14 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
if (pRpc->contLen < sizeof(SMsgHead)) {
dError("invalid rpc msg since no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType),
pRpc->contLen);
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
return -1;
}
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);

View File

@ -248,6 +248,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||

View File

@ -957,7 +957,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
if (code == TSDB_CODE_SYN_TIMEOUT) {

View File

@ -113,7 +113,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit:

View File

@ -257,7 +257,7 @@ _exit:
pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index;
vInfo("vgId:%d, vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", TD_VID(pReader->pVnode),
pReader->index, pHdr->type, *nData);
} else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);

View File

@ -425,9 +425,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}

View File

@ -471,17 +471,20 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
}
int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
int32_t code = 0;
int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1);
CTG_LOCK(CTG_WRITE, &pJob->taskLock);
CTG_ERR_RET((*gCtgAsyncFps[type].initFp)(pJob, tid, param));
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
CTG_ERR_JRET((*gCtgAsyncFps[type].initFp)(pJob, tid, param));
if (taskId) {
*taskId = tid;
}
return TSDB_CODE_SUCCESS;
_return:
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
return code;
}
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,

View File

@ -2500,6 +2500,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);

View File

@ -907,7 +907,7 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
}
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
}
@ -1396,7 +1396,6 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
@ -1547,7 +1546,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
NULL, pOperator);
@ -1767,8 +1766,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
.maxTs = INT64_MIN,
};
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->win = pTaskInfo->window;
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
@ -2252,7 +2249,6 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
return;
}
blockDataEnsureCapacity(pBlock, size - (*pIndex));
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
@ -2359,7 +2355,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
ASSERT(pSDataBlock->pDataBlock != NULL);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
@ -2482,7 +2477,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pPullDataRes;
}
@ -2543,7 +2537,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->numOfDatapack++;
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
@ -2633,7 +2626,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
return pInfo->pPullDataRes;
}
@ -2688,7 +2680,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
};
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
@ -2713,7 +2705,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
ASSERT(numOfCols > 0);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
@ -2724,6 +2715,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pChildren = NULL;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
if (!pInfo->pChildren) {
goto _error;
}
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) {
@ -2746,7 +2740,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// semi interval operator does not catch result
pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator";
ASSERT(pInfo->aggSup.currentPageId == -1);
}
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
@ -3162,15 +3155,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
if (fp) {
void* ptr = taosArrayGet(pWinInfos, index);
fp(ptr);
}
taosArrayRemove(pWinInfos, index);
}
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
@ -3218,7 +3202,6 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
ASSERT(keyLen == sizeof(SSessionKey));
taosArrayPush(pUpdated, key);
}
taosArraySort(pUpdated, sessionKeyCompareAsc);
@ -3279,7 +3262,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t numOfOutput = pSup->numOfExprs;
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
ASSERT(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pWinArray, i);
@ -3380,7 +3362,6 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0;
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
@ -4811,7 +4792,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
ASSERT(numOfCols > 0);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
@ -4831,7 +4811,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.deleteMark = getDeleteMark(pIntervalPhyNode),
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;

View File

@ -606,6 +606,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||

View File

@ -375,7 +375,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \

View File

@ -48,6 +48,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
// exec
while (1) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
return 0;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(exec, &output, &ts)) < 0) {

View File

@ -238,7 +238,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
void syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);

View File

@ -61,7 +61,7 @@ typedef struct SSyncLogBuffer {
// SSyncLogRepMgr
SSyncLogReplMgr* syncLogReplMgrCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode);
void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode);
#ifdef __cplusplus
}
#endif

View File

@ -56,7 +56,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
@ -79,8 +79,8 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);

View File

@ -200,12 +200,15 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
break;
default:
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
}
syncNodeRelease(pSyncNode);
if (code != 0) {
sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
terrno);
}
return code;
}
@ -228,8 +231,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode);
if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle,
rpcMsg.info.ahandle);
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
return 0;
} else {
@ -1084,13 +1086,17 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// snapshot senders
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender;
sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
if (pSender == NULL) return NULL;
pSyncNode->senders[i] = pSender;
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
if (pSyncNode->pNewNodeReceiver == NULL) return NULL;
sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p",
pSyncNode->pNewNodeReceiver);
// is config changing
pSyncNode->changing = false;
@ -1131,10 +1137,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0;
sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
tsHeartbeatInterval, tsHeartbeatTimeout);
sNInfo(pSyncNode, "sync open, node:%p electInterval:%d heartbeatInterval:%d heartbeatTimeout:%d", pSyncNode,
tsElectInterval, tsHeartbeatInterval, tsHeartbeatTimeout);
return pSyncNode;
_error:
@ -1251,6 +1255,8 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
@ -1295,15 +1301,15 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) {
sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] != NULL) {
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", pSyncNode->vgId, pSyncNode->senders[i]);
if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
snapshotSenderStop((pSyncNode->senders)[i], false);
if (snapshotSenderIsStart(pSyncNode->senders[i])) {
snapshotSenderStop(pSyncNode->senders[i], false);
}
snapshotSenderDestroy((pSyncNode->senders)[i]);
(pSyncNode->senders)[i] = NULL;
snapshotSenderDestroy(pSyncNode->senders[i]);
pSyncNode->senders[i] = NULL;
}
}
@ -1312,6 +1318,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
@ -1382,8 +1389,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
return ret;
}
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t electMS;
if (pSyncNode->pRaftCfg->isStandBy) {
@ -1391,11 +1397,11 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
} else {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
}
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
(void)syncNodeRestartElectTimer(pSyncNode, electMS);
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS);
return ret;
}
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
@ -1455,23 +1461,20 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilRaftId2EpSet(destRaftId, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
if (pSyncNode->syncSendMSg != NULL) {
syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->syncSendMSg(&epSet, pMsg);
return pSyncNode->syncSendMSg(&epSet, pMsg);
} else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
@ -1586,7 +1589,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
oldSenders[i] = pSyncNode->senders[i];
sSTrace(oldSenders[i], "snapshot sender save old");
}
@ -1625,7 +1628,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// clear new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i] = NULL;
pSyncNode->senders[i] = NULL;
}
// reset new
@ -1640,16 +1643,16 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
(pSyncNode->senders)[i] = oldSenders[j];
pSyncNode->senders[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
int32_t oldreplicaIndex = pSyncNode->senders[i]->replicaIndex;
pSyncNode->senders[i]->replicaIndex = i;
sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
i, host, port, (pSyncNode->senders)[i], reset);
i, host, port, pSyncNode->senders[i], reset);
break;
}
@ -1658,18 +1661,23 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sSTrace((pSyncNode->senders)[i], "snapshot sender create new while reconfig, data:%p", (pSyncNode->senders)[i]);
if (pSyncNode->senders[i] == NULL) {
pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i);
if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
} else {
sSTrace((pSyncNode->senders)[i], "snapshot sender already exist, data:%p", (pSyncNode->senders)[i]);
sSDebug(pSyncNode->senders[i], "snapshot sender create while reconfig, data:%p", pSyncNode->senders[i]);
}
} else {
sSDebug(pSyncNode->senders[i], "snapshot sender already exist, data:%p", pSyncNode->senders[i]);
}
}
// free old
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
sNTrace(pSyncNode, "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
sSDebug(oldSenders[i], "snapshot sender destroy old, data:%p replica-index:%d", oldSenders[i], i);
snapshotSenderDestroy(oldSenders[i]);
oldSenders[i] = NULL;
}
@ -1844,8 +1852,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm;
if (pSyncNode->senders[i]->privateTerm > pMySender->privateTerm) {
pMySender->privateTerm = pSyncNode->senders[i]->privateTerm;
}
}
(pMySender->privateTerm) += 100;
@ -2376,9 +2384,20 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
}
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (pEntry->dataLen < sizeof(SMsgHead)) {
sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
syncEntryDestroy(pEntry);
return -1;
}
// append to log buffer
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index);
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL;
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
TSDB_CODE_SYN_BUFFER_FULL);
syncEntryDestroy(pEntry);
return -1;
}
@ -2671,16 +2690,24 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
}
if (pEntry == NULL) {
sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
return -1;
}
if (ths->state == TAOS_SYNC_STATE_LEADER) {
if (pRetIndex) {
(*pRetIndex) = index;
}
int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
ASSERTS(false, "failed to append blocking msg");
if (code < 0) {
sNError(ths, "failed to append blocking msg");
}
return code;
} else {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
return -1;

View File

@ -26,6 +26,11 @@
#include "syncSnapshot.h"
#include "syncUtil.h"
static bool syncIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex);
int64_t index = pBuf->endIndex;
@ -443,16 +448,15 @@ _out:
return matchIndex;
}
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
ASSERTS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM");
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) {
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
return 0;
}
if (pNode->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
sTrace("vgId:%d, blocking msg ready to execute. index:%" PRId64 ", term: %" PRId64 ", type: %s", pNode->vgId,
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
}
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
@ -460,14 +464,14 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
pEntry->term);
}
SRpcMsg rpcMsg = {0};
SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.code = applyCode;
cbMeta.state = role;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
@ -476,7 +480,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
ASSERT(rpcMsg.pCont == NULL);
return code;
}
@ -527,7 +530,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role: %d, current term: %" PRId64,
vgId, pEntry->index, pEntry->term, role, term);
@ -574,7 +577,9 @@ _out:
return ret;
}
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) return;
ASSERT(pMgr->startIndex >= 0);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
@ -584,7 +589,6 @@ int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
pMgr->endIndex = 0;
pMgr->restored = false;
pMgr->retryBackoff = 0;
return 0;
}
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {

View File

@ -54,7 +54,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return;
sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId);
// free current block
if (pSender->pCurrentBlock != NULL) {
@ -75,12 +74,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "vgId:%d, snapshot sender is already start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->start = true;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
@ -95,7 +88,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID;
pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
memset(&(pSender->lastConfig), 0, sizeof(pSender->lastConfig));
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->startTime = taosGetTimestampMs();
@ -111,7 +104,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
@ -122,7 +115,6 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// event log
sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg
@ -134,7 +126,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
return 0;
}
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
// update flag
@ -154,8 +146,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
}
return 0;
}
// when sender receive ack, call this function to send msg from seq
@ -177,8 +167,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
if (pSender->blockLen > 0) {
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
// has read data
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
@ -194,7 +184,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
@ -202,7 +192,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
// pMsg->privateTerm = pSender->privateTerm;
if (pSender->pCurrentBlock != NULL) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
@ -210,10 +199,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else {
sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
}
@ -238,7 +225,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
@ -248,12 +235,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
// pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// event log
sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
// send msg
@ -299,13 +284,10 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sSInfo(pSender, "snapshot sender start too frequently, ignore");
return 1;
return 0;
}
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port);
sSInfo(pSender, "snapshot sender start");
int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
@ -338,13 +320,11 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId);
return pReceiver;
}
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver == NULL) return;
sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId);
// close writer
if (pReceiver->pWriter != NULL) {
@ -368,7 +348,6 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
// event log
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
@ -380,13 +359,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
pReceiver->start = false;
}
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver is not start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
@ -416,10 +389,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
return 0;
}
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver has started");
return 0;
return;
}
pReceiver->start = true;
@ -430,12 +403,11 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
// event log
sRInfo(pReceiver, "snapshot receiver is start");
return 0;
}
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
if (pReceiver->pWriter != NULL) {
@ -450,17 +422,10 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
return 0;
}
// when recv last snapshot block, apply data into snapshot
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
int32_t code = 0;
if (pReceiver->pWriter != NULL) {
// write data
@ -582,6 +547,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (snapshotReceiverIsStart(pReceiver)) {
// already start
@ -593,14 +559,14 @@ static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
pReceiver->startTime, pMsg->startTime);
goto _SEND_REPLY;
} else {
// ignore
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
return 0;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = terrno;
goto _SEND_REPLY;
}
} else {
// start new
sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
@ -611,7 +577,8 @@ _START_RECEIVER:
if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
pMsg->startTime);
return -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = terrno;
} else {
// waiting for clock match
while (timeNow < pMsg->startTime) {
@ -647,7 +614,7 @@ _SEND_REPLY:
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pMsg->seq; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
// send msg
@ -657,26 +624,36 @@ _SEND_REPLY:
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 1
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int32_t code = TSDB_CODE_SYN_INTERNAL_ERROR;
if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver not start");
return -1;
sRError(pReceiver, "snapshot receiver begin failed since not start");
goto _SEND_REPLY;
}
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
return -1;
goto _SEND_REPLY;
}
// start writer
snapshotReceiverStartWriter(pReceiver, pMsg);
if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) {
sRError(pReceiver, "snapshot receiver begin failed since start writer failed");
goto _SEND_REPLY;
}
code = 0;
_SEND_REPLY:
if (code != 0 && terrno != 0) {
code = terrno;
}
// build msg
SRpcMsg rpcMsg = {0};
@ -693,7 +670,7 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
@ -703,10 +680,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 4
// transfering
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
@ -753,7 +730,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
return -1;
}
return 0;
return code;
}
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
@ -790,7 +767,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
@ -800,7 +777,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
return -1;
}
return 0;
return code;
}
// receiver on message
@ -830,12 +807,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
return 0;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
return 0;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pMsg->term > pSyncNode->pRaftStore->currentTerm) {
@ -844,20 +823,21 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncNodeResetElectTimer(pSyncNode);
// state, term, seq/ack
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotPre(pSyncNode, pMsg);
code = syncNodeOnSnapshotPre(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
syncNodeOnSnapshotBegin(pSyncNode, pMsg);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
syncNodeOnSnapshotEnd(pSyncNode, pMsg);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
return -1;
code = -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
@ -865,35 +845,27 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
snapshotReceiverForceStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
} else {
// error log
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
return -1;
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver term not equal");
return -1;
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver not follower");
return -1;
code = -1;
}
return 0;
return code;
}
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
if (pSender == NULL) {
sNError(pSyncNode, "prepare snapshot error since sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
@ -915,7 +887,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSender->snapshot = snapshot;
// start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
if (code != 0) {
sSError(pSender, "prepare snapshot failed since %s", terrstr());
return -1;
@ -936,7 +908,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
@ -966,8 +938,9 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
@ -983,6 +956,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
goto _ERROR;
}
@ -990,6 +964,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime,
pSender->startTime, pMsg->code);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
@ -997,20 +972,21 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
pSyncNode->pRaftStore->currentTerm);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code);
terrno = pMsg->code;
goto _ERROR;
}
// prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
return 0;
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
@ -1030,10 +1006,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return 0;
}
@ -1047,22 +1020,19 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
snapshotReSend(pSender);
if (snapshotReSend(pSender) != 0) {
return -1;
}
} else {
// error log
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogReplMgrReset(pMgr);
}
return -1;
}
@ -1071,10 +1041,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
_ERROR:
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return -1;
}

View File

@ -1665,11 +1665,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
int32_t code = pResp->code;
// return internal code app
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
pResp->code = pCtx->retryCode;
}
}
// check whole vnodes is offline on this vgroup
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
} else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
}
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {

View File

@ -51,6 +51,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQD
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
@ -406,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq