Skip invalid sdb raw.

This commit is contained in:
xiao-77 2024-12-06 11:15:28 +08:00
parent ea96288654
commit 053ad5557f
2 changed files with 26 additions and 13 deletions

View File

@ -14,13 +14,13 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTrans.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndSubscribe.h" #include "mndSubscribe.h"
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TRANS_VER1_NUMBER 1 #define TRANS_VER1_NUMBER 1
@ -216,7 +216,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
@ -287,8 +287,8 @@ _OVER:
SSdbRow *mndTransDecode(SSdbRaw *pRaw) { SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSdbRow *pRow = NULL; SSdbRow *pRow = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
char *pData = NULL; char *pData = NULL;
@ -890,7 +890,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
void* pGidIter = taosHashIterate(pNew->arbGroupIds, NULL); void *pGidIter = taosHashIterate(pNew->arbGroupIds, NULL);
while (pGidIter != NULL) { while (pGidIter != NULL) {
int32_t groupId = *(int32_t *)pGidIter; int32_t groupId = *(int32_t *)pGidIter;
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
@ -1033,7 +1033,7 @@ static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0; int32_t code = 0;
if (pTrans == NULL) { if (pTrans == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
@ -1305,6 +1305,14 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH); TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
} }
if (pAction->pRaw->type >= SDB_MAX) {
pAction->rawWritten = true;
pAction->errCode = 0;
mndSetTransLastAction(pTrans, pAction);
mInfo("skip sdb raw type:%d since it is not supported", pAction->pRaw->type);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
pAction->rawWritten = true; pAction->rawWritten = true;
@ -1348,10 +1356,10 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
char detail[1024] = {0}; char detail[1024] = {0};
int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(pAction->msgType), int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(pAction->msgType),
pAction->epSet.numOfEps, pAction->epSet.inUse); pAction->epSet.numOfEps, pAction->epSet.inUse);
for (int32_t i = 0; i < pAction->epSet.numOfEps; ++i) { for (int32_t i = 0; i < pAction->epSet.numOfEps; ++i) {
len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, pAction->epSet.eps[i].fqdn, len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, pAction->epSet.eps[i].fqdn,
pAction->epSet.eps[i].port); pAction->epSet.eps[i].port);
} }
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg); int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
@ -1454,9 +1462,9 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x", mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x", pTrans->id,
pTrans->id, mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived, mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived, pAction->errCode,
pAction->errCode, pAction->acceptableCode, pAction->retryCode); pAction->acceptableCode, pAction->retryCode);
if (pAction->msgSent) { if (pAction->msgSent) {
if (pAction->msgReceived) { if (pAction->msgReceived) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
@ -2038,11 +2046,11 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
char detail[TSDB_TRANS_ERROR_LEN + 1] = {0}; char detail[TSDB_TRANS_ERROR_LEN + 1] = {0};
int32_t len = tsnprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction, int32_t len = tsnprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction,
pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo)); pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo));
SEpSet epset = pTrans->lastEpset; SEpSet epset = pTrans->lastEpset;
if (epset.numOfEps > 0) { if (epset.numOfEps > 0) {
len += tsnprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ", len += tsnprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse); TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse);
for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) { for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) {
len += tsnprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); len += tsnprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
} }

View File

@ -388,6 +388,11 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
goto _OVER; goto _OVER;
} }
if (pRaw->type >= SDB_MAX) {
mInfo("skip sdb raw type:%d since it is not supported", pRaw->type);
continue;
}
code = sdbWriteWithoutFree(pSdb, pRaw); code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read sdb file:%s since %s", file, terrstr()); mError("failed to read sdb file:%s since %s", file, terrstr());