Merge pull request #18769 from taosdata/FIX/TD-21043-3.0
enh: check contiguousness of indexes applied to vnode
This commit is contained in:
commit
b286959aed
|
@ -70,6 +70,11 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||||
(type == TDMT_VND_UPDATE_TAG_VAL);
|
(type == TDMT_VND_UPDATE_TAG_VAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
||||||
|
return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER;
|
||||||
|
}
|
||||||
|
|
||||||
/* ------------------------ OTHER DEFINITIONS ------------------------ */
|
/* ------------------------ OTHER DEFINITIONS ------------------------ */
|
||||||
// IE type
|
// IE type
|
||||||
#define TSDB_IE_TYPE_SEC 1
|
#define TSDB_IE_TYPE_SEC 1
|
||||||
|
|
|
@ -119,7 +119,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
int32_t code = 0;
|
||||||
|
if (!syncUtilUserCommit(pMsg->msgType)) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
||||||
|
|
||||||
|
_out:
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -190,9 +190,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
version);
|
version);
|
||||||
|
|
||||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||||
|
ASSERT(pVnode->state.applied + 1 == version);
|
||||||
|
|
||||||
pVnode->state.applied = version;
|
pVnode->state.applied = version;
|
||||||
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
|
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
|
||||||
|
|
||||||
|
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
|
||||||
|
|
||||||
// skip header
|
// skip header
|
||||||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
len = pMsg->contLen - sizeof(SMsgHead);
|
len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
|
@ -79,7 +79,6 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len);
|
||||||
void syncUtilMsgHtoN(void* msg);
|
void syncUtilMsgHtoN(void* msg);
|
||||||
void syncUtilMsgNtoH(void* msg);
|
void syncUtilMsgNtoH(void* msg);
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType);
|
bool syncUtilUserPreCommit(tmsg_t msgType);
|
||||||
bool syncUtilUserCommit(tmsg_t msgType);
|
|
||||||
bool syncUtilUserRollback(tmsg_t msgType);
|
bool syncUtilUserRollback(tmsg_t msgType);
|
||||||
|
|
||||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
|
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
|
||||||
|
|
|
@ -513,13 +513,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
|
sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
|
||||||
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||||
pBuf->commitIndex = index;
|
|
||||||
if (!inBuf) {
|
|
||||||
syncEntryDestroy(pEntry);
|
|
||||||
pEntry = NULL;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
||||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||||
", role: %d, current term: %" PRId64,
|
", role: %d, current term: %" PRId64,
|
||||||
|
|
|
@ -160,8 +160,6 @@ void syncUtilMsgNtoH(void* msg) {
|
||||||
|
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||||
|
|
||||||
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
|
||||||
|
|
||||||
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
||||||
|
|
||||||
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
||||||
|
|
Loading…
Reference in New Issue