fix: handle error if sync buffer is full
This commit is contained in:
parent
d892299078
commit
794fb5d1b5
|
@ -520,6 +520,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
||||
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
|
||||
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
|
||||
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) //
|
||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||
|
||||
// tq
|
||||
|
|
|
@ -391,9 +391,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
|
|||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
||||
", weak:%d, code:%d, state:%d %s, type:%s",
|
||||
", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
|
||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
||||
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
||||
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
|
||||
|
||||
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
|
||||
}
|
||||
|
|
|
@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
|
|||
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
|
||||
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
|
||||
|
||||
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
|
||||
int32_t applyCode);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -2386,7 +2386,11 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
|||
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||
// append to log buffer
|
||||
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
|
||||
sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index);
|
||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||
terrno = TSDB_CODE_SYN_BUFFER_FULL;
|
||||
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
|
||||
TSDB_CODE_SYN_BUFFER_FULL);
|
||||
syncEntryDestroy(pEntry);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -2685,8 +2689,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
}
|
||||
|
||||
int32_t code = syncNodeAppend(ths, pEntry);
|
||||
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
|
||||
ASSERTS(false, "failed to append blocking msg");
|
||||
if (code < 0) {
|
||||
sNError(ths, "failed to append blocking msg");
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,11 @@
|
|||
#include "syncSnapshot.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
static bool syncIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||
}
|
||||
|
||||
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
int64_t index = pBuf->endIndex;
|
||||
|
@ -441,26 +446,25 @@ _out:
|
|||
return matchIndex;
|
||||
}
|
||||
|
||||
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
|
||||
ASSERTS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM");
|
||||
|
||||
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
|
||||
int32_t applyCode) {
|
||||
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pNode->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) {
|
||||
sTrace("vgId:%d, blocking msg ready to execute. index:%" PRId64 ", term: %" PRId64 ", type: %s", pNode->vgId,
|
||||
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
|
||||
sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
|
||||
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
SRpcMsg rpcMsg = {.code = applyCode};
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = 0;
|
||||
cbMeta.code = applyCode;
|
||||
cbMeta.state = role;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
|
@ -469,7 +473,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
|
|||
|
||||
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
||||
ASSERT(rpcMsg.pCont == NULL);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -520,7 +523,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
}
|
||||
|
||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
|
||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||
", role: %d, current term: %" PRId64,
|
||||
vgId, pEntry->index, pEntry->term, role, term);
|
||||
|
|
|
@ -407,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||
|
||||
//tq
|
||||
|
|
Loading…
Reference in New Issue