diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 3ff86bea3e..9373e258be 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -22,121 +22,70 @@ extern "C" { #endif -#define SDB_GET_INT64(pData, pRow, dataPos, val) \ - { \ - if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \ - tfree(pRow); \ - return NULL; \ - } \ - dataPos += sizeof(int64_t); \ +#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ + { \ + if (func(pRaw, dataPos, val) != 0) { \ + goto pos; \ + } \ + dataPos += sizeof(type); \ } -#define SDB_GET_INT32(pData, pRow, dataPos, val) \ - { \ - if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \ - tfree(pRow); \ - return NULL; \ - } \ - dataPos += sizeof(int32_t); \ - } - -#define SDB_GET_INT16(pData, pRow, dataPos, val) \ - { \ - if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \ - tfree(pRow); \ - return NULL; \ - } \ - dataPos += sizeof(int16_t); \ - } - -#define SDB_GET_INT8(pData, pRow, dataPos, val) \ - { \ - if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ - tfree(pRow); \ - return NULL; \ - } \ - dataPos += sizeof(int8_t); \ - } - -#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \ +#define SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos) \ { \ if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ - tfree(pRow); \ - return NULL; \ + goto pos; \ } \ dataPos += valLen; \ } -#define SDB_GET_RESERVE(pRaw, pRow, dataPos, valLen) \ - { \ - char val[valLen] = {0}; \ - if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ - tfree(pRow); \ - return NULL; \ - } \ - dataPos += valLen; \ +#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t) + +#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) + +#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) + +#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) + +#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ + { \ + char val[valLen] = {0}; \ + SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos) \ } -#define SDB_SET_INT64(pRaw, dataPos, val) \ - { \ - if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ - } \ - dataPos += sizeof(int64_t); \ +#define SDB_SET_VAL(pRaw, dataPos, val, pos, func, type) \ + { \ + if (func(pRaw, dataPos, val) != 0) { \ + goto pos; \ + } \ + dataPos += sizeof(type); \ } -#define SDB_SET_INT32(pRaw, dataPos, val) \ - { \ - if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ - } \ - dataPos += sizeof(int32_t); \ - } +#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t) -#define SDB_SET_INT16(pRaw, dataPos, val) \ - { \ - if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ - } \ - dataPos += sizeof(int16_t); \ - } +#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) -#define SDB_SET_INT8(pRaw, dataPos, val) \ - { \ - if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ - } \ - dataPos += sizeof(int8_t); \ - } +#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) -#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \ +#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) + +#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ { \ if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ + goto pos; \ } \ dataPos += valLen; \ } -#define SDB_SET_RESERVE(pRaw, dataPos, valLen) \ - { \ - char val[valLen] = {0}; \ - if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ - } \ - dataPos += valLen; \ +#define SDB_SET_RESERVE(pRaw, dataPos, valLen, pos) \ + { \ + char val[valLen] = {0}; \ + SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ } -#define SDB_SET_DATALEN(pRaw, dataLen) \ +#define SDB_SET_DATALEN(pRaw, dataLen, pos) \ { \ if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ - sdbFreeRaw(pRaw); \ - return NULL; \ + goto pos; \ } \ } diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 5c67c10817..7ba19677fd 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -115,8 +115,8 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 2); - CheckInt32(1); CheckInt32(2); + CheckInt32(3); CheckInt32(0); CheckInt32(0); CheckInt16(1); diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 0c1ae1bffa..ae8ffea4ea 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -66,75 +66,100 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("acct:%s, will be created while deploy sdb", acctObj.acct); + mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw); return sdbWrite(pMnode->pSdb, pRaw); } static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj) + TSDB_ACCT_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto ACCT_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN) - SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime) - SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime) - SDB_SET_INT32(pRaw, dataPos, pAcct->acctId) - SDB_SET_INT32(pRaw, dataPos, pAcct->status) - SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers) - SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs) - SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries) - SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams) - SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage) - SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN, ACCT_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime, ACCT_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->acctId, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->status, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams, ACCT_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage, ACCT_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState, ACCT_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, ACCT_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, ACCT_ENCODE_OVER) + terrno = 0; + +ACCT_ENCODE_OVER: + if (terrno != 0) { + mError("acct:%s, failed to encode to raw:%p since %s", pAcct->acct, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("acct:%s, encode to raw:%p, row:%p", pAcct->acct, pRaw, pAcct); return pRaw; } static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto ACCT_DECODE_OVER; if (sver != TSDB_ACCT_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode acct since %s", terrstr()); + goto ACCT_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + if (pRow == NULL) goto ACCT_DECODE_OVER; + + SAcctObj *pAcct = sdbGetRowObj(pRow); + if (pAcct == NULL) goto ACCT_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN, ACCT_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pAcct->createdTime, ACCT_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pAcct->updateTime, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->acctId, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->status, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxUsers, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxDbs, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxTimeSeries, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxStreams, ACCT_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pAcct->cfg.maxStorage, ACCT_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.accessState, ACCT_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, ACCT_DECODE_OVER) + + terrno = 0; + +ACCT_DECODE_OVER: + if (terrno != 0) { + mError("acct:%s, failed to decode from raw:%p since %s", pAcct->acct, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); - SAcctObj *pAcct = sdbGetRowObj(pRow); - if (pAcct == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pAcct->acct, TSDB_USER_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->updateTime) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->acctId) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->status) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxUsers) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxDbs) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxTimeSeries) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxStreams) - SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->cfg.maxStorage) - SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.accessState) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_ACCT_RESERVE_SIZE) - + mTrace("acct:%s, decode from raw:%p, row:%p", pAcct->acct, pRaw, pAcct); return pRow; } static int32_t mndAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { - mTrace("acct:%s, perform insert action", pAcct->acct); + mTrace("acct:%s, perform insert action, row:%p", pAcct->acct, pAcct); return 0; } static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { - mTrace("acct:%s, perform delete action", pAcct->acct); + mTrace("acct:%s, perform delete action, row:%p", pAcct->acct, pAcct); return 0; } static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOldAcct, SAcctObj *pNewAcct) { - mTrace("acct:%s, perform update action", pOldAcct->acct); + mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOldAcct->acct, pOldAcct, pNewAcct); pOldAcct->updateTime = pNewAcct->updateTime; pOldAcct->status = pNewAcct->status; diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 2d14e0f92e..ceaebe3f6d 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -58,9 +58,9 @@ int32_t mndInitBnode(SMnode *pMnode) { void mndCleanupBnode(SMnode *pMnode) {} -static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t snodeId) { +static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) { SSdb *pSdb = pMnode->pSdb; - SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &snodeId); + SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &bnodeId); if (pObj == NULL) { terrno = TSDB_CODE_MND_BNODE_NOT_EXIST; } @@ -73,47 +73,72 @@ static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) { } static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, TSDB_BNODE_VER_NUMBER, sizeof(SBnodeObj) + TSDB_BNODE_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto BNODE_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id); - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE) + SDB_SET_INT32(pRaw, dataPos, pObj->id, BNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, BNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, BNODE_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_ENCODE_OVER) + terrno = 0; + +BNODE_ENCODE_OVER: + if (terrno != 0) { + mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("bnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj); return pRaw; } static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto BNODE_DECODE_OVER; if (sver != TSDB_BNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode snode since %s", terrstr()); + goto BNODE_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj)); + if (pRow == NULL) goto BNODE_DECODE_OVER; + + SBnodeObj *pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto BNODE_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pObj->id, BNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, BNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, BNODE_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_DECODE_OVER) + + terrno = 0; + +BNODE_DECODE_OVER: + if (terrno != 0) { + mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj)); - SBnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_BNODE_RESERVE_SIZE) - + mTrace("bnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj); return pRow; } static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) { - mTrace("snode:%d, perform insert action", pObj->id); + mTrace("bnode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - mError("snode:%d, failed to perform insert action since %s", pObj->id, terrstr()); + mError("bnode:%d, failed to perform insert action since %s", pObj->id, terrstr()); return -1; } @@ -121,7 +146,7 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) { } static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) { - mTrace("snode:%d, perform delete action", pObj->id); + mTrace("bnode:%d, perform delete action, row:%p", pObj->id, pObj); if (pObj->pDnode != NULL) { sdbRelease(pSdb, pObj->pDnode); pObj->pDnode = NULL; @@ -131,7 +156,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) { } static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode) { - mTrace("snode:%d, perform update action", pOldBnode->id); + mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOldBnode->id, pOldBnode, pNewBnode); pOldBnode->updateTime = pNewBnode->updateTime; return 0; } @@ -175,30 +200,30 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S } static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeMsg *pCreate) { - SBnodeObj snodeObj = {0}; - snodeObj.id = pDnode->id; - snodeObj.createdTime = taosGetTimestampMs(); - snodeObj.updateTime = snodeObj.createdTime; + SBnodeObj bnodeObj = {0}; + bnodeObj.id = pDnode->id; + bnodeObj.createdTime = taosGetTimestampMs(); + bnodeObj.updateTime = bnodeObj.createdTime; int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { - mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); goto CREATE_BNODE_OVER; } - mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); + mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); - if (mndSetCreateBnodeRedoLogs(pTrans, &snodeObj) != 0) { + if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); goto CREATE_BNODE_OVER; } - if (mndSetCreateBnodeCommitLogs(pTrans, &snodeObj) != 0) { + if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto CREATE_BNODE_OVER; } - if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) { + if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto CREATE_BNODE_OVER; } @@ -221,18 +246,18 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) { pCreate->dnodeId = htonl(pCreate->dnodeId); - mDebug("snode:%d, start to create", pCreate->dnodeId); + mDebug("bnode:%d, start to create", pCreate->dnodeId); SBnodeObj *pObj = mndAcquireBnode(pMnode, pCreate->dnodeId); if (pObj != NULL) { - mError("snode:%d, snode already exist", pObj->id); + mError("bnode:%d, bnode already exist", pObj->id); mndReleaseBnode(pMnode, pObj); return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); if (pDnode == NULL) { - mError("snode:%d, dnode not exist", pCreate->dnodeId); + mError("bnode:%d, dnode not exist", pCreate->dnodeId); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } @@ -241,7 +266,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) { mndReleaseDnode(pMnode, pDnode); if (code != 0) { - mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); return -1; } @@ -290,11 +315,11 @@ static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { - mError("snode:%d, failed to drop since %s", pObj->id, terrstr()); + mError("bnode:%d, failed to drop since %s", pObj->id, terrstr()); goto DROP_BNODE_OVER; } - mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); + mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); @@ -328,24 +353,24 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) { SMDropBnodeMsg *pDrop = pMsg->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); - mDebug("snode:%d, start to drop", pDrop->dnodeId); + mDebug("bnode:%d, start to drop", pDrop->dnodeId); if (pDrop->dnodeId <= 0) { terrno = TSDB_CODE_SDB_APP_ERROR; - mError("snode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); + mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { - mError("snode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + mError("bnode:%d, not exist", pDrop->dnodeId); + terrno = TSDB_CODE_MND_BNODE_NOT_EXIST; return -1; } int32_t code = mndDropBnode(pMnode, pMsg, pObj); if (code != 0) { - mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + mError("bnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 1c12109f38..00cfa6b413 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -63,55 +63,80 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { } static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto CLUSTER_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT64(pRaw, dataPos, pCluster->id); - SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) - SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) - SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE) + SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER) + terrno = 0; + +CLUSTER_ENCODE_OVER: + if (terrno != 0) { + mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("cluster:%" PRId64 ", encode to raw:%p, row:%p", pCluster->id, pRaw, pCluster); return pRaw; } static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER; if (sver != TSDB_CLUSTER_VER_NUMBE) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode cluster since %s", terrstr()); + goto CLUSTER_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); + if (pRow == NULL) goto CLUSTER_DECODE_OVER; + + SClusterObj *pCluster = sdbGetRowObj(pRow); + if (pCluster == NULL) goto CLUSTER_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER) + + terrno = 0; + +CLUSTER_DECODE_OVER: + if (terrno != 0) { + mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); - SClusterObj *pCluster = sdbGetRowObj(pRow); - if (pCluster == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) - SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_CLUSTER_RESERVE_SIZE) - + mTrace("cluster:%" PRId64 ", decode from raw:%p, row:%p", pCluster->id, pRaw, pCluster); return pRow; } static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { - mTrace("cluster:%" PRId64 ", perform insert action", pCluster->id); + mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster); return 0; } static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { - mTrace("cluster:%" PRId64 ", perform delete action", pCluster->id); + mTrace("cluster:%" PRId64 ", perform delete action, row:%p", pCluster->id, pCluster); return 0; } -static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) { - mTrace("cluster:%" PRId64 ", perform update action", pOldCluster->id); +static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) { + mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); return 0; } @@ -135,7 +160,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("cluster:%" PRId64 ", will be created while deploy sdb", clusterObj.id); + mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9e7cdbf09e..32a044fe09 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -67,66 +67,87 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto CM_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); - SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); - SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); - SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); - SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); + SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CM_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CM_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime, CM_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime, CM_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pConsumer->uid, CM_ENCODE_OVER) /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ - SDB_SET_INT32(pRaw, dataPos, pConsumer->version); + SDB_SET_INT32(pRaw, dataPos, pConsumer->version, CM_ENCODE_OVER) - SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER) +CM_ENCODE_OVER: + if (terrno != 0) { + mError("consumer:%s, failed to encode to raw:%p since %s", pConsumer->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("consumer:%s, encode to raw:%p, row:%p", pConsumer->name, pRaw, pConsumer); return pRaw; } static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER; if (sver != MND_CONSUMER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode consumer since %s", terrstr()); + goto CONSUME_DECODE_OVER; + } + + int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto CONSUME_DECODE_OVER; + + SConsumerObj *pConsumer = sdbGetRowObj(pRow); + if (pConsumer == NULL) goto CONSUME_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CONSUME_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CONSUME_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pConsumer->createTime, CONSUME_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pConsumer->updateTime, CONSUME_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pConsumer->uid, CONSUME_DECODE_OVER) + /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ + SDB_GET_INT32(pRaw, dataPos, &pConsumer->version, CONSUME_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER) + terrno = 0; + +CONSUME_DECODE_OVER: + if (terrno != 0) { + mError("consumer:%s, failed to decode from raw:%p since %s", pConsumer->name, pRaw, terrstr()); + tfree(pRow); return NULL; } - int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); - SConsumerObj *pConsumer = sdbGetRowObj(pRow); - if (pConsumer == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); - SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); - /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ - SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); - - SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); - + mTrace("consumer:%s, decode from raw:%p, row:%p", pConsumer->name, pRaw, pConsumer); return pRow; } static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%s, perform insert action", pConsumer->name); + mTrace("consumer:%s, perform insert action, row:%p", pConsumer->name, pConsumer); return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%s, perform delete action", pConsumer->name); + mTrace("consumer:%s, perform delete action, row:%p", pConsumer->name, pConsumer); return 0; } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { - mTrace("consumer:%s, perform update action", pOldConsumer->name); + mTrace("consumer:%s, perform update action, old_row:%p new_row:%p", pOldConsumer->name, pOldConsumer, pNewConsumer); atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime); atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index fe49ad99fe..410368f130 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -65,100 +65,125 @@ int32_t mndInitDb(SMnode *pMnode) { void mndCleanupDb(SMnode *pMnode) {} static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUMBER, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto DB_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN) - SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN) - SDB_SET_INT64(pRaw, dataPos, pDb->createdTime) - SDB_SET_INT64(pRaw, dataPos, pDb->updateTime) - SDB_SET_INT64(pRaw, dataPos, pDb->uid) - SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion) - SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion) - SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfVgroups) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime) - SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.precision) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update) - SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN, DB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN, DB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->createdTime, DB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->updateTime, DB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->uid, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfVgroups, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.precision, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER) + terrno = 0; + +DB_ENCODE_OVER: + if (terrno != 0) { + mError("db:%s, failed to encode to raw:%p since %s", pDb->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("db:%s, encode to raw:%p, row:%p", pDb->name, pRaw, pDb); return pRaw; } static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto DB_DECODE_OVER; if (sver != TSDB_DB_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode db since %s", terrstr()); - return NULL; + goto DB_DECODE_OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj)); - SDbObj *pDb = sdbGetRowObj(pRow); - if (pDb == NULL) return NULL; + if (pRow == NULL) goto DB_DECODE_OVER; + + SDbObj *pDb = sdbGetRowObj(pRow); + if (pDb == NULL) goto DB_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->name, TSDB_DB_FNAME_LEN) - SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->acct, TSDB_USER_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.numOfVgroups) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRows) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRows) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.precision) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.compression) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.replications) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.quorum) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.update) - SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.cacheLastRow) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_DB_RESERVE_SIZE) + SDB_GET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN, DB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN, DB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->createdTime, DB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->updateTime, DB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->uid, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfgVersion, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->vgVersion, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->hashMethod, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfVgroups, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.cacheBlockSize, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.totalBlocks, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep2, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.minRows, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.maxRows, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.commitTime, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.fsyncPeriod, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.walLevel, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.precision, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.compression, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.replications, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER) + terrno = 0; + +DB_DECODE_OVER: + if (terrno != 0) { + mError("db:%s, failed to decode from raw:%p since %s", pDb->name, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("db:%s, decode from raw:%p, row:%p", pDb->name, pRaw, pDb); return pRow; } static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { - mTrace("db:%s, perform insert action", pDb->name); + mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); return 0; } static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { - mTrace("db:%s, perform delete action", pDb->name); + mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb); return 0; } static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { - mTrace("db:%s, perform update action", pOldDb->name); + mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb); pOldDb->updateTime = pNewDb->updateTime; pOldDb->cfgVersion = pNewDb->cfgVersion; pOldDb->vgVersion = pNewDb->vgVersion; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1c7f8544e9..d110969025 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -100,65 +100,90 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { if (pRaw == NULL) return -1; if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; - mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id); + mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw); return sdbWrite(pMnode->pSdb, pRaw); } static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto DNODE_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pDnode->id); - SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime) - SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime) - SDB_SET_INT16(pRaw, dataPos, pDnode->port) - SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_INT32(pRaw, dataPos, pDnode->id, DNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, DNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, DNODE_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pDnode->port, DNODE_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, DNODE_ENCODE_OVER); + terrno = 0; + +DNODE_ENCODE_OVER: + if (terrno != 0) { + mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode); return pRaw; } static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto DNODE_DECODE_OVER; if (sver != TSDB_DNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode dnode since %s", terrstr()); + goto DNODE_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj)); + if (pRow == NULL) goto DNODE_DECODE_OVER; + + SDnodeObj *pDnode = sdbGetRowObj(pRow); + if (pDnode == NULL) goto DNODE_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pDnode->id, DNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, DNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, DNODE_DECODE_OVER) + SDB_GET_INT16(pRaw, dataPos, &pDnode->port, DNODE_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_DECODE_OVER) + + terrno = 0; + +DNODE_DECODE_OVER: + if (terrno != 0) { + mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj)); - SDnodeObj *pDnode = sdbGetRowObj(pRow); - if (pDnode == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pDnode->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime) - SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port) - SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_DNODE_RESERVE_SIZE) - + mTrace("dnode:%d, decode from raw:%p, row:%p", pDnode->id, pRaw, pDnode); return pRow; } static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) { - mTrace("dnode:%d, perform insert action", pDnode->id); + mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode); pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED; snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); return 0; } static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { - mTrace("dnode:%d, perform delete action", pDnode->id); + mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode); return 0; } static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode) { - mTrace("dnode:%d, perform update action", pOldDnode->id); + mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOldDnode->id, pOldDnode, pNewDnode); pOldDnode->updateTime = pNewDnode->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 59d04ab4a1..57c94fe5db 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -58,76 +58,101 @@ int32_t mndInitFunc(SMnode *pMnode) { void mndCleanupFunc(SMnode *pMnode) {} static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj); SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto FUNC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN) - SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime) - SDB_SET_INT8(pRaw, dataPos, pFunc->funcType) - SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType) - SDB_SET_INT8(pRaw, dataPos, pFunc->align) - SDB_SET_INT8(pRaw, dataPos, pFunc->outputType) - SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen) - SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize) - SDB_SET_INT64(pRaw, dataPos, pFunc->sigature) - SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize) - SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize) - SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize) - SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, FUNC_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, FUNC_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, FUNC_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pFunc->align, FUNC_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pFunc->sigature, FUNC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, FUNC_ENCODE_OVER); + terrno = 0; + +FUNC_ENCODE_OVER: + if (terrno != 0) { + mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("func:%d, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc); return pRaw; } static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto FUNC_DECODE_OVER; if (sver != SDB_FUNC_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode func since %s", terrstr()); - return NULL; + goto FUNC_DECODE_OVER; } - int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN; - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN; + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto FUNC_DECODE_OVER; + SFuncObj *pFunc = sdbGetRowObj(pRow); - if (pFunc == NULL) return NULL; + if (pFunc == NULL) goto FUNC_DECODE_OVER; char *tmp = (char *)pFunc + sizeof(SFuncObj); int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pFunc->createdTime) - SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->funcType) - SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->scriptType) - SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->align) - SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->outputType) - SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->outputLen) - SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->bufSize) - SDB_GET_INT64(pRaw, pRow, dataPos, &pFunc->sigature) - SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->commentSize) - SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->codeSize) - SDB_GET_BINARY(pRaw, pRow, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize) + SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, FUNC_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, FUNC_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, FUNC_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pFunc->align, FUNC_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pFunc->sigature, FUNC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize, FUNC_DECODE_OVER) pFunc->pComment = pFunc->pData; pFunc->pCode = (pFunc->pData + pFunc->commentSize); + terrno = 0; + +FUNC_DECODE_OVER: + if (terrno != 0) { + mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc); return pRow; } static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) { - mTrace("func:%s, perform insert action", pFunc->name); + mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc); return 0; } static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { - mTrace("func:%s, perform delete action", pFunc->name); + mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc); return 0; } static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { - mTrace("func:%s, perform update action", pOldFunc->name); + mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc); return 0; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index ad3d5e1cf6..df1848f2f1 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -117,50 +117,75 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id); + mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw); return sdbWrite(pMnode->pSdb, pRaw); } static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj) + TSDB_MNODE_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto MNODE_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id); - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE) + SDB_SET_INT32(pRaw, dataPos, pObj->id, MNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, MNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, MNODE_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_ENCODE_OVER) + terrno = 0; + +MNODE_ENCODE_OVER: + if (terrno != 0) { + mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj); return pRaw; } static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sver != TSDB_MNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode mnode since %s", terrstr()); + goto MNODE_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); + if (pRow == NULL) goto MNODE_DECODE_OVER; + + SMnodeObj *pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto MNODE_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pObj->id, MNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, MNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, MNODE_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_DECODE_OVER) + + terrno = 0; + +MNODE_DECODE_OVER: + if (terrno != 0) { + mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); - SMnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_MNODE_RESERVE_SIZE) - + mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj); return pRow; } static void mnodeResetMnode(SMnodeObj *pObj) { pObj->role = TAOS_SYNC_STATE_FOLLOWER; } static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { - mTrace("mnode:%d, perform insert action", pObj->id); + mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -173,7 +198,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { } static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { - mTrace("mnode:%d, perform delete action", pObj->id); + mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj); if (pObj->pDnode != NULL) { sdbRelease(pSdb, pObj->pDnode); pObj->pDnode = NULL; @@ -183,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { } static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) { - mTrace("mnode:%d, perform update action", pOldMnode->id); + mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOldMnode->id, pOldMnode, pNewMnode); pOldMnode->updateTime = pNewMnode->updateTime; return 0; } @@ -370,7 +395,7 @@ CREATE_MNODE_OVER: } static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); @@ -537,7 +562,7 @@ DROP_MNODE_OVER: } static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 3b27764c7c..a0c4ff8218 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -73,43 +73,68 @@ static void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) { } static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_QNODE, TSDB_QNODE_VER_NUMBER, sizeof(SQnodeObj) + TSDB_QNODE_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto QNODE_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id); - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE) + SDB_SET_INT32(pRaw, dataPos, pObj->id, QNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, QNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, QNODE_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_ENCODE_OVER) + terrno = 0; + +QNODE_ENCODE_OVER: + if (terrno != 0) { + mError("qnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("qnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj); return pRaw; } static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto QNODE_DECODE_OVER; if (sver != TSDB_QNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode qnode since %s", terrstr()); + goto QNODE_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj)); + if (pRow == NULL) goto QNODE_DECODE_OVER; + + SQnodeObj *pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto QNODE_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pObj->id, QNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, QNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, QNODE_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_DECODE_OVER) + + terrno = 0; + +QNODE_DECODE_OVER: + if (terrno != 0) { + mError("qnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj)); - SQnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_QNODE_RESERVE_SIZE) - + mTrace("qnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj); return pRow; } static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) { - mTrace("qnode:%d, perform insert action", pObj->id); + mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -121,7 +146,7 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) { } static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { - mTrace("qnode:%d, perform delete action", pObj->id); + mTrace("qnode:%d, perform delete action, row:%p", pObj->id, pObj); if (pObj->pDnode != NULL) { sdbRelease(pSdb, pObj->pDnode); pObj->pDnode = NULL; @@ -131,7 +156,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { } static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode) { - mTrace("qnode:%d, perform update action", pOldQnode->id); + mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOldQnode->id, pOldQnode, pNewQnode); pOldQnode->updateTime = pNewQnode->updateTime; return 0; } @@ -339,7 +364,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) { SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { mError("qnode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + terrno = TSDB_CODE_MND_QNODE_NOT_EXIST; return -1; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index a4825c0e82..ac03e1659c 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -73,43 +73,68 @@ static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) { } static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto SNODE_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id); - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE) + SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER) + terrno = 0; + +SNODE_ENCODE_OVER: + if (terrno != 0) { + mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("snode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj); return pRaw; } static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER; if (sver != TSDB_SNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode snode since %s", terrstr()); + goto SNODE_DECODE_OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj)); + if (pRow == NULL) goto SNODE_DECODE_OVER; + + SSnodeObj *pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto SNODE_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER) + + terrno = 0; + +SNODE_DECODE_OVER: + if (terrno != 0) { + mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); + tfree(pRow); return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj)); - SSnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_SNODE_RESERVE_SIZE) - + mTrace("snode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj); return pRow; } static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) { - mTrace("snode:%d, perform insert action", pObj->id); + mTrace("snode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -121,7 +146,7 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) { } static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) { - mTrace("snode:%d, perform delete action", pObj->id); + mTrace("snode:%d, perform delete action, row:%p", pObj->id, pObj); if (pObj->pDnode != NULL) { sdbRelease(pSdb, pObj->pDnode); pObj->pDnode = NULL; @@ -131,7 +156,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) { } static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode) { - mTrace("snode:%d, perform update action", pOldSnode->id); + mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOldSnode->id, pOldSnode, pNewSnode); pOldSnode->updateTime = pNewSnode->updateTime; return 0; } @@ -339,7 +364,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) { SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { mError("snode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + terrno = TSDB_CODE_MND_SNODE_NOT_EXIST; return -1; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3454ea1884..8f18308374 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -70,90 +70,115 @@ int32_t mndInitStb(SMnode *pMnode) { void mndCleanupStb(SMnode *pMnode) {} static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto STB_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN) - SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN) - SDB_SET_INT64(pRaw, dataPos, pStb->createdTime) - SDB_SET_INT64(pRaw, dataPos, pStb->updateTime) - SDB_SET_INT64(pRaw, dataPos, pStb->uid) - SDB_SET_INT64(pRaw, dataPos, pStb->dbUid) - SDB_SET_INT32(pRaw, dataPos, pStb->version) - SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns) - SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags) + SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; for (int32_t i = 0; i < totalCols; ++i) { SSchema *pSchema = &pStb->pSchema[i]; - SDB_SET_INT8(pRaw, dataPos, pSchema->type); - SDB_SET_INT32(pRaw, dataPos, pSchema->colId); - SDB_SET_INT32(pRaw, dataPos, pSchema->bytes); - SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN); + SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) } - SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) + terrno = 0; + +STB_ENCODE_OVER: + if (terrno != 0) { + mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb); return pRaw; } static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STB_DECODE_OVER; if (sver != TSDB_STB_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode stable since %s", terrstr()); - return NULL; + goto STB_DECODE_OVER; } int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto STB_DECODE_OVER; + SStbObj *pStb = sdbGetRowObj(pRow); - if (pStb == NULL) return NULL; + if (pStb == NULL) goto STB_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN) - SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->db, TSDB_DB_FNAME_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid) - SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->dbUid) - SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->version) - SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfColumns) - SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfTags) + SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, STB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; pStb->pSchema = calloc(totalCols, sizeof(SSchema)); for (int32_t i = 0; i < totalCols; ++i) { SSchema *pSchema = &pStb->pSchema[i]; - SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type); - SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId); - SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes); - SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN); + SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) } - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_STB_RESERVE_SIZE) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER) + terrno = 0; + +STB_DECODE_OVER: + if (terrno != 0) { + mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb); return pRow; } static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { - mTrace("stb:%s, perform insert action", pStb->name); + mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb); return 0; } static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { - mTrace("stb:%s, perform delete action", pStb->name); + mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); return 0; } static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) { - mTrace("stb:%s, perform update action", pOldStb->name); + mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb); atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime); atomic_exchange_32(&pOldStb->version, pNewStb->version); @@ -177,7 +202,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb } SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; @@ -202,9 +227,9 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) { SVCreateTbReq req; - void * buf; + void *buf; int bsize; - SMsgHead * pMsgHead; + SMsgHead *pMsgHead; req.ver = 0; req.name = pStb->name; @@ -325,9 +350,9 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj } static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; - void * pIter = NULL; + void *pIter = NULL; int contLen; while (1) { @@ -361,9 +386,9 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj } static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; - void * pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -467,7 +492,7 @@ CREATE_STB_OVER: } static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { - SMnode * pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to create", pCreate->name); @@ -551,7 +576,7 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; } static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { - SMnode * pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to alter", pAlter->name); @@ -665,7 +690,7 @@ DROP_STB_OVER: } static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { - SMnode * pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SDropStbMsg *pDrop = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to drop", pDrop->name); @@ -700,7 +725,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { } static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { - SMnode * pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); @@ -772,7 +797,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs } int32_t numOfStbs = 0; - void * pIter = NULL; + void *pIter = NULL; while (1) { SStbObj *pStb = NULL; pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); @@ -791,7 +816,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) { return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 24e32e07b4..9fb63f6f58 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -59,80 +59,105 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); - SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN); - SDB_SET_INT64(pRaw, dataPos, pTopic->createTime); - SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime); - SDB_SET_INT64(pRaw, dataPos, pTopic->uid); - SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid); - SDB_SET_INT32(pRaw, dataPos, pTopic->version); - SDB_SET_INT32(pRaw, dataPos, pTopic->execLen); - SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen); - SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); - SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen); + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTopic->execLen, TOPIC_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER) - SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE); - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER) + terrno = 0; + +TOPIC_ENCODE_OVER: + if (terrno != 0) { + mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic); return pRaw; } static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER; if (sver != MND_TOPIC_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode topic since %s", terrstr()); + goto TOPIC_DECODE_OVER; + } + + int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto TOPIC_DECODE_OVER; + + STopicObj *pTopic = sdbGetRowObj(pRow); + if (pTopic == NULL) goto TOPIC_DECODE_OVER; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTopic->execLen, TOPIC_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER) + + SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) + + terrno = 0; + +TOPIC_DECODE_OVER: + if (terrno != 0) { + mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); + tfree(pRow); return NULL; } - int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); - STopicObj *pTopic = sdbGetRowObj(pRow); - if (pTopic == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); - SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN); - SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->createTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->updateTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid); - SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid); - SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version); - SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->execLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->executor, pTopic->execLen); - SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen); - - SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE); - + mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic); return pRow; } static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) { - mTrace("topic:%s, perform insert action", pTopic->name); + mTrace("topic:%s, perform insert action, row:%p", pTopic->name, pTopic); return 0; } static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) { - mTrace("topic:%s, perform delete action", pTopic->name); + mTrace("topic:%s, perform delete action, row:%p", pTopic->name, pTopic); return 0; } static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) { - mTrace("topic:%s, perform update action", pOldTopic->name); + mTrace("topic:%s, perform update action, old_row:%p new_row:%p", pOldTopic->name, pOldTopic, pNewTopic); atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); taosWLockLatch(&pOldTopic->lock); - - //TODO handle update + + // TODO handle update taosWUnLockLatch(&pOldTopic->lock); return 0; @@ -201,9 +226,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq } static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMCreateTopicReq* pCreate; + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMCreateTopicReq *pCreate; tDeserializeSCMCreateTopicReq(msgStr, pCreate); mDebug("topic:%s, start to create", pCreate->name); @@ -245,9 +270,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { - return 0; -} +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { return 0; } static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b0d799d21b..5062048d6d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -31,6 +31,7 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); +static void mndTransDropData(STrans *pTrans); static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); @@ -70,6 +71,8 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} static SSdbRaw *mndTransActionEncode(STrans *pTrans) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE; int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); @@ -109,75 +112,95 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { } int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pTrans->id) - SDB_SET_INT8(pRaw, dataPos, pTrans->policy) - SDB_SET_INT32(pRaw, dataPos, redoLogNum) - SDB_SET_INT32(pRaw, dataPos, undoLogNum) - SDB_SET_INT32(pRaw, dataPos, commitLogNum) - SDB_SET_INT32(pRaw, dataPos, redoActionNum) - SDB_SET_INT32(pRaw, dataPos, undoActionNum) + SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER) for (int32_t i = 0; i < redoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) + SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) } for (int32_t i = 0; i < undoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) + SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) } for (int32_t i = 0; i < commitLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) + SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) } for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT16(pRaw, dataPos, pAction->msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT16(pRaw, dataPos, pAction->msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) + } + + SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER) + + terrno = 0; + +TRANS_ENCODE_OVER: + if (terrno != 0) { + mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr()); + sdbFreeRaw(pRaw); + return NULL; } - SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); return pRaw; } static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { - int32_t code = 0; + terrno = TSDB_CODE_OUT_OF_MEMORY; - int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + SSdbRow *pRow = NULL; + STrans *pTrans = NULL; + char *pData = NULL; + int32_t dataLen = 0; + int8_t sver = 0; + int32_t redoLogNum = 0; + int32_t undoLogNum = 0; + int32_t commitLogNum = 0; + int32_t redoActionNum = 0; + int32_t undoActionNum = 0; + int32_t dataPos = 0; + STransAction action = {0}; + + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER; if (sver != MND_TRANS_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode trans since %s", terrstr()); - return NULL; + goto TRANS_DECODE_OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); - STrans *pTrans = sdbGetRowObj(pRow); - if (pTrans == NULL) { - mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr()); - return NULL; - } + pRow = sdbAllocRow(sizeof(STrans)); + if (pRow == NULL) goto TRANS_DECODE_OVER; + + pTrans = sdbGetRowObj(pRow); + if (pTrans == NULL) goto TRANS_DECODE_OVER; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -185,112 +208,79 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); - if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || - pTrans->redoActions == NULL || pTrans->undoActions == NULL) { - mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw); - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER; + if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER; + if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER; + if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER; + if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER; - int32_t redoLogNum = 0; - int32_t undoLogNum = 0; - int32_t commitLogNum = 0; - int32_t redoActionNum = 0; - int32_t undoActionNum = 0; - - int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id) - SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy) - SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum) - SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum) - SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum) - SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) - SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) + SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER) for (int32_t i = 0; i < redoLogNum; ++i) { - int32_t dataLen = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); - - void *ret = taosArrayPush(pTrans->redoLogs, &pData); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) + pData = malloc(dataLen); + if (pData == NULL) goto TRANS_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); + if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER; + pData = NULL; } for (int32_t i = 0; i < undoLogNum; ++i) { - int32_t dataLen = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); - - void *ret = taosArrayPush(pTrans->undoLogs, &pData); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) + pData = malloc(dataLen); + if (pData == NULL) goto TRANS_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); + if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER; + pData = NULL; } for (int32_t i = 0; i < commitLogNum; ++i) { - int32_t dataLen = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); - - void *ret = taosArrayPush(pTrans->commitLogs, &pData); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) + pData = malloc(dataLen); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER); + if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER; + pData = NULL; } for (int32_t i = 0; i < redoActionNum; ++i) { - STransAction action = {0}; - SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) action.pCont = malloc(action.contLen); - if (action.pCont == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); - - void *ret = taosArrayPush(pTrans->redoActions, &action); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + if (action.pCont == NULL) goto TRANS_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER); + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER; + action.pCont = NULL; } for (int32_t i = 0; i < undoActionNum; ++i) { - STransAction action = {0}; - SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) action.pCont = malloc(action.contLen); - if (action.pCont == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); - - void *ret = taosArrayPush(pTrans->undoActions, &action); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto TRANS_DECODE_OVER; - } + if (action.pCont == NULL) goto TRANS_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER); + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER; + action.pCont = NULL; } - SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TRANS_RESERVE_SIZE) + SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER) + + terrno = 0; TRANS_DECODE_OVER: - if (code != 0) { - mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno)); - mndTransDrop(pTrans); - terrno = code; + if (terrno != 0) { + mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); + mndTransDropData(pTrans); + tfree(pRow); + tfree(pData); + tfree(action.pCont); return NULL; } @@ -304,15 +294,17 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { return 0; } -static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans); - +static void mndTransDropData(STrans *pTrans) { mndTransDropLogs(pTrans->redoLogs); mndTransDropLogs(pTrans->undoLogs); mndTransDropLogs(pTrans->commitLogs); mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); +} +static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { + mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans); + mndTransDropData(pTrans); return 0; } @@ -367,6 +359,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { } static void mndTransDropLogs(SArray *pArray) { + if (pArray == NULL) return; for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); sdbFreeRaw(pRaw); @@ -376,6 +369,7 @@ static void mndTransDropLogs(SArray *pArray) { } static void mndTransDropActions(SArray *pArray) { + if (pArray == NULL) return; for (int32_t i = 0; i < pArray->size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); free(pAction->pCont); @@ -385,12 +379,7 @@ static void mndTransDropActions(SArray *pArray) { } void mndTransDrop(STrans *pTrans) { - mndTransDropLogs(pTrans->redoLogs); - mndTransDropLogs(pTrans->undoLogs); - mndTransDropLogs(pTrans->commitLogs); - mndTransDropActions(pTrans->redoActions); - mndTransDropActions(pTrans->undoActions); - + mndTransDropData(pTrans); mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); tfree(pTrans); } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 8b891ddb92..6311d3e8da 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -94,45 +94,68 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) { } static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, TSDB_USER_VER_NUMBER, sizeof(SUserObj) + TSDB_USER_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto USER_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN) - SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN) - SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) - SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) - SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) - SDB_SET_INT8(pRaw, dataPos, pUser->superUser) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, USER_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, USER_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pUser->superUser, USER_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER) + + terrno = 0; + +USER_ENCODE_OVER: + if (terrno != 0) { + mError("user:%s, failed to encode to raw:%p since %s", pUser->user, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } mTrace("user:%s, encode to raw:%p, row:%p", pUser->user, pRaw, pUser); return pRaw; } static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER; if (sver != TSDB_USER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode user since %s", terrstr()); - return NULL; + goto USER_DECODE_OVER; } - SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); + SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); + if (pRow == NULL) goto USER_DECODE_OVER; + SUserObj *pUser = sdbGetRowObj(pRow); - if (pUser == NULL) return NULL; + if (pUser == NULL) goto USER_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN) - SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_PASSWORD_LEN) - SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) - SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser) - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_USER_RESERVE_SIZE) + SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, USER_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, USER_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, USER_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_DECODE_OVER) + + terrno = 0; + +USER_DECODE_OVER: + if (terrno != 0) { + mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr()); + tfree(pRow); + return NULL; + } mTrace("user:%s, decode from raw:%p, row:%p", pUser->user, pRaw, pUser); return pRow; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3090f1e645..bd17c6d150 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -70,77 +70,102 @@ int32_t mndInitVgroup(SMnode *pMnode) { void mndCleanupVgroup(SMnode *pMnode) {} SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); - if (pRaw == NULL) return NULL; + if (pRaw == NULL) goto VG_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId) - SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime) - SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime) - SDB_SET_INT32(pRaw, dataPos, pVgroup->version) - SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin) - SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd) - SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN) - SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid) - SDB_SET_INT8(pRaw, dataPos, pVgroup->replica) + SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; - SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId) + SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER) } - SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE) - SDB_SET_DATALEN(pRaw, dataPos); + SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER) + SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER) + terrno = 0; + +VG_ENCODE_OVER: + if (terrno != 0) { + mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup); return pRaw; } SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER; if (sver != TSDB_VGROUP_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode vgroup since %s", terrstr()); - return NULL; + goto VG_DECODE_OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj)); - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) return NULL; + if (pRow == NULL) goto VG_DECODE_OVER; + + SVgObj *pVgroup = sdbGetRowObj(pRow); + if (pVgroup == NULL) goto VG_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->vgId) - SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime) - SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime) - SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version) - SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin) - SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd) - SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN) - SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid) - SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; - SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId) + SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER) if (pVgroup->replica == 1) { pVgid->role = TAOS_SYNC_STATE_LEADER; } } - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE) + SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER) + terrno = 0; + +VG_DECODE_OVER: + if (terrno != 0) { + mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr()); + tfree(pRow); + return NULL; + } + + mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup); return pRow; } static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { - mTrace("vgId:%d, perform insert action", pVgroup->vgId); + mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); return 0; } static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) { - mTrace("vgId:%d, perform delete action", pVgroup->vgId); + mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup); return 0; } static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { - mTrace("vgId:%d, perform update action", pOldVgroup->vgId); + mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup); pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->version = pNewVgroup->version; pOldVgroup->hashBegin = pNewVgroup->hashBegin;