From 9a927afbd976ef63660147fc386f27ed4158dcc3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 17:24:34 +0800 Subject: [PATCH 1/5] auth for stb --- source/dnode/mnode/impl/inc/mndAuth.h | 3 + source/dnode/mnode/impl/src/mndAuth.c | 26 ++++++++ source/dnode/mnode/impl/src/mndStb.c | 93 ++++++++++++++++++++------- 3 files changed, 98 insertions(+), 24 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndAuth.h b/source/dnode/mnode/impl/inc/mndAuth.h index c2c69f000b..7dcb518f9b 100644 --- a/source/dnode/mnode/impl/inc/mndAuth.h +++ b/source/dnode/mnode/impl/inc/mndAuth.h @@ -36,6 +36,9 @@ int32_t mndCheckCreateDbAuth(SUserObj *pOperUser); int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb); int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb); +int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb); +int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index 84c8b7476c..afebb066b3 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -141,3 +141,29 @@ int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb) { } int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb) { return 0; } + +int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb) { + if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) { + return 0; + } + + if (taosHashGet(pOperUser->writeDbs, pDb->name, strlen(pDb->name) + 1) != NULL) { + return 0; + } + + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; +} + +int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb) { + if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) { + return 0; + } + + if (taosHashGet(pOperUser->readDbs, pDb->name, strlen(pDb->name) + 1) != NULL) { + return 0; + } + + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 436b0b9961..69a8479d47 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndStb.h" +#include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -343,7 +344,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { return -1; } - SField *pField = taosArrayGet(pCreate->pColumns, 0) ; + SField *pField = taosArrayGet(pCreate->pColumns, 0); if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; @@ -549,12 +550,16 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { SStbObj *pTopicStb = NULL; SStbObj *pStb = NULL; SDbObj *pDb = NULL; + SUserObj *pUser = NULL; SMCreateStbReq createReq = {0}; if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER; mDebug("stb:%s, start to create", createReq.name); - if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER; + if (mndCheckCreateStbReq(&createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CREATE_STB_OVER; + } pStb = mndAcquireStb(pMnode, createReq.name); if (pStb != NULL) { @@ -582,6 +587,15 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { goto CREATE_STB_OVER; } + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto CREATE_STB_OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto CREATE_STB_OVER; + } + code = mndCreateStb(pMnode, pReq, &createReq, pDb); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -593,8 +607,8 @@ CREATE_STB_OVER: mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pTopicStb); mndReleaseDb(pMnode, pDb); - taosArrayDestroy(createReq.pColumns); - taosArrayDestroy(createReq.pTags); + mndReleaseUser(pMnode, pUser); + tFreeSMCreateStbReq(&createReq); return code; } @@ -965,7 +979,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq * int32_t code = -1; STrans *pTrans = NULL; SField *pField0 = taosArrayGet(pAlter->pFields, 0); - + switch (pAlter->alterType) { case TSDB_ALTER_TABLE_ADD_TAG: code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields); @@ -1020,9 +1034,13 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { int32_t code = -1; SDbObj *pDb = NULL; SStbObj *pStb = NULL; + SUserObj *pUser = NULL; SMAltertbReq alterReq = {0}; - if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) goto ALTER_STB_OVER; + if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) { + terrno = TSDB_CODE_INVALID_MSG; + goto ALTER_STB_OVER; + } mDebug("stb:%s, start to alter", alterReq.name); if (mndCheckAlterStbReq(&alterReq) != 0) goto ALTER_STB_OVER; @@ -1039,6 +1057,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { goto ALTER_STB_OVER; } + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto ALTER_STB_OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto ALTER_STB_OVER; + } + code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -1049,6 +1076,7 @@ ALTER_STB_OVER: mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); taosArrayDestroy(alterReq.pFields); return code; @@ -1135,43 +1163,60 @@ DROP_STB_OVER: } static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SStbObj *pStb = NULL; SMDropStbReq dropReq = {0}; - tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq); + + if (tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto DROP_STB_OVER; + } mDebug("stb:%s, start to drop", dropReq.name); - SStbObj *pStb = mndAcquireStb(pMnode, dropReq.name); + pStb = mndAcquireStb(pMnode, dropReq.name); if (pStb == NULL) { if (dropReq.igNotExists) { mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name); - return 0; + code = 0; + goto DROP_STB_OVER; } else { terrno = TSDB_CODE_MND_STB_NOT_EXIST; - mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + goto DROP_STB_OVER; } } - SDbObj *pDb = mndAcquireDbByStb(pMnode, dropReq.name); + pDb = mndAcquireDbByStb(pMnode, dropReq.name); if (pDb == NULL) { - mndReleaseStb(pMnode, pStb); terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + goto DROP_STB_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto DROP_STB_OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto DROP_STB_OVER; + } + + code = mndDropStb(pMnode, pReq, pDb, pStb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +DROP_STB_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); } - int32_t code = mndDropStb(pMnode, pReq, pDb, pStb); mndReleaseDb(pMnode, pDb); mndReleaseStb(pMnode, pStb); + mndReleaseUser(pMnode, pUser); - if (code != 0) { - mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; - } - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return code; } static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) { From 2f4148e8d40a4396feb134eddcaf5d8466d8a597 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 17:40:38 +0800 Subject: [PATCH 2/5] Need to delete stb when deleting database --- source/dnode/mnode/impl/inc/mndStb.h | 1 + source/dnode/mnode/impl/src/mndDb.c | 19 +++++++++++++++++++ source/dnode/mnode/impl/src/mndStb.c | 3 +-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 4ff696381e..82d93a4b9b 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -26,6 +26,7 @@ int32_t mndInitStb(SMnode *pMnode); void mndCleanupStb(SMnode *pMnode); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); +SSdbRaw *mndStbActionEncode(SStbObj *pStb); int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a6d7e92da6..0cceb1084b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -18,6 +18,7 @@ #include "mndAuth.h" #include "mndDnode.h" #include "mndShow.h" +#include "mndStb.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -722,6 +723,24 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD sdbRelease(pSdb, pVgroup); } + while (1) { + SStbObj *pStb = NULL; + pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); + if (pIter == NULL) break; + + if (pStb->dbUid == pDb->uid) { + SSdbRaw *pStbRaw = mndStbActionEncode(pStb); + if (pStbRaw == NULL || mndTransAppendCommitlog(pTrans, pStbRaw) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pStbRaw); + return -1; + } + sdbSetRawStatus(pStbRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pSdb, pStb); + } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 69a8479d47..31e2d35c01 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -28,7 +28,6 @@ #define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_RESERVE_SIZE 64 -static SSdbRaw *mndStbActionEncode(SStbObj *pStb); static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); @@ -70,7 +69,7 @@ int32_t mndInitStb(SMnode *pMnode) { void mndCleanupStb(SMnode *pMnode) {} -static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { +SSdbRaw *mndStbActionEncode(SStbObj *pStb) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE; From d89f396dc7b07d3c3a6f8deabf63f34953b607ef Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 18:17:18 +0800 Subject: [PATCH 3/5] serialize stb msg --- include/common/tmsg.h | 13 +- source/common/src/tmsg.c | 153 ++++++++++++++-------- source/dnode/mnode/impl/src/mndConsumer.c | 14 +- source/dnode/mnode/impl/src/mndStb.c | 9 +- source/dnode/mnode/impl/test/stb/stb.cpp | 47 +++---- source/libs/parser/src/astToMsg.c | 10 +- 6 files changed, 140 insertions(+), 106 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 80a81ff143..be37fb9e9c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -272,8 +272,8 @@ typedef struct { char comment[TSDB_STB_COMMENT_LEN]; } SMCreateStbReq; -int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq); -void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq); +int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); +int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); void tFreeSMCreateStbReq(SMCreateStbReq* pReq); typedef struct { @@ -281,8 +281,8 @@ typedef struct { int8_t igNotExists; } SMDropStbReq; -int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq); -void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq); +int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); +int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; @@ -291,8 +291,9 @@ typedef struct { SArray* pFields; } SMAltertbReq; -int32_t tSerializeSMAlterStbReq(void** buf, SMAltertbReq* pReq); -void* tDeserializeSMAlterStbReq(void* buf, SMAltertbReq* pReq); +int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); +int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); +void tFreeSMAltertbReq(SMAltertbReq* pReq); typedef struct { int32_t pid; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index acbefc6bea..411acbfcfd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -375,132 +375,171 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { return buf; } -int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) { - int32_t tlen = 0; +int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeFixedI32(buf, pReq->numOfColumns); - tlen += taosEncodeFixedI32(buf, pReq->numOfTags); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField *pField = taosArrayGet(pReq->pColumns, i); - tlen += taosEncodeFixedI8(buf, pField->type); - tlen += taosEncodeFixedI32(buf, pField->bytes); - tlen += taosEncodeString(buf, pField->name); + if (tEncodeI8(&encoder, pField->type) < 0) return -1; + if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; + if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } for (int32_t i = 0; i < pReq->numOfTags; ++i) { SField *pField = taosArrayGet(pReq->pTags, i); - tlen += taosEncodeFixedI8(buf, pField->type); - tlen += taosEncodeFixedI32(buf, pField->bytes); - tlen += taosEncodeString(buf, pField->name); + if (tEncodeI8(&encoder, pField->type) < 0) return -1; + if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; + if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } - tlen += taosEncodeString(buf, pReq->comment); + if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) { - buf = taosDecodeStringTo(buf, pReq->name); - buf = taosDecodeFixedI8(buf, &pReq->igExists); - buf = taosDecodeFixedI32(buf, &pReq->numOfColumns); - buf = taosDecodeFixedI32(buf, &pReq->numOfTags); +int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); if (pReq->pColumns == NULL || pReq->pTags == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField field = {0}; - buf = taosDecodeFixedI8(buf, &field.type); - buf = taosDecodeFixedI32(buf, &field.bytes); - buf = taosDecodeStringTo(buf, field.name); + if (tDecodeI8(&decoder, &field.type) < 0) return -1; + if (tDecodeI32(&decoder, &field.bytes) < 0) return -1; + if (tDecodeCStrTo(&decoder, field.name) < 0) return -1; if (taosArrayPush(pReq->pColumns, &field) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } } for (int32_t i = 0; i < pReq->numOfTags; ++i) { SField field = {0}; - buf = taosDecodeFixedI8(buf, &field.type); - buf = taosDecodeFixedI32(buf, &field.bytes); - buf = taosDecodeStringTo(buf, field.name); + if (tDecodeI8(&decoder, &field.type) < 0) return -1; + if (tDecodeI32(&decoder, &field.bytes) < 0) return -1; + if (tDecodeCStrTo(&decoder, field.name) < 0) return -1; if (taosArrayPush(pReq->pTags, &field) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } } - buf = taosDecodeStringTo(buf, pReq->comment); - return buf; + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; } void tFreeSMCreateStbReq(SMCreateStbReq *pReq) { taosArrayDestroy(pReq->pColumns); taosArrayDestroy(pReq->pTags); + pReq->pColumns = NULL; + pReq->pTags = NULL; } -int32_t tSerializeSMDropStbReq(void **buf, SMDropStbReq *pReq) { - int32_t tlen = 0; +int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeFixedI8(buf, pReq->igNotExists); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) { - buf = taosDecodeStringTo(buf, pReq->name); - buf = taosDecodeFixedI8(buf, &pReq->igNotExists); +int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); - return buf; + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; } -int32_t tSerializeSMAlterStbReq(void **buf, SMAltertbReq *pReq) { - int32_t tlen = 0; - - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeFixedI8(buf, pReq->alterType); - tlen += taosEncodeFixedI32(buf, pReq->numOfFields); +int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1; for (int32_t i = 0; i < pReq->numOfFields; ++i) { SField *pField = taosArrayGet(pReq->pFields, i); - tlen += taosEncodeFixedU8(buf, pField->type); - tlen += taosEncodeFixedI32(buf, pField->bytes); - tlen += taosEncodeString(buf, pField->name); + if (tEncodeI8(&encoder, pField->type) < 0) return -1; + if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; + if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) { - buf = taosDecodeStringTo(buf, pReq->name); - buf = taosDecodeFixedI8(buf, &pReq->alterType); - buf = taosDecodeFixedI32(buf, &pReq->numOfFields); +int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1; pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField)); if (pReq->pFields == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } - for (int32_t i = 0; i < pReq->numOfFields; ++i) { SField field = {0}; - buf = taosDecodeFixedU8(buf, &field.type); - buf = taosDecodeFixedI32(buf, &field.bytes); - buf = taosDecodeStringTo(buf, field.name); + if (tDecodeI8(&decoder, &field.type) < 0) return -1; + if (tDecodeI32(&decoder, &field.bytes) < 0) return -1; + if (tDecodeCStrTo(&decoder, field.name) < 0) return -1; if (taosArrayPush(pReq->pFields, &field) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return -1; } } - return buf; + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + +void tFreeSMAltertbReq(SMAltertbReq *pReq) { + taosArrayDestroy(pReq->pFields); + pReq->pFields = NULL; } int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 21ebea258d..0f4b538cde 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -14,8 +14,8 @@ */ #define _DEFAULT_SOURCE - #include "mndConsumer.h" +#include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -53,13 +53,14 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { - SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj)); +SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup) { + SMqConsumerObj *pConsumer = calloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pConsumer->recentRemovedTopics = taosArrayInit(0, sizeof(char*)); + + pConsumer->recentRemovedTopics = taosArrayInit(1, sizeof(char *)); pConsumer->epoch = 1; pConsumer->consumerId = consumerId; atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT); @@ -70,7 +71,8 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void* buf = NULL; + + void *buf = NULL; int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE; @@ -105,7 +107,7 @@ CM_ENCODE_OVER: SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void* buf = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 31e2d35c01..1fc1b25776 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -552,7 +552,10 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { SUserObj *pUser = NULL; SMCreateStbReq createReq = {0}; - if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER; + if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CREATE_STB_OVER; + } mDebug("stb:%s, start to create", createReq.name); if (mndCheckCreateStbReq(&createReq) != 0) { @@ -1036,7 +1039,7 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { SUserObj *pUser = NULL; SMAltertbReq alterReq = {0}; - if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) { + if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto ALTER_STB_OVER; } @@ -1169,7 +1172,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { SStbObj *pStb = NULL; SMDropStbReq dropReq = {0}; - if (tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq) != 0) { + if (tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto DROP_STB_OVER; } diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 573c1f4846..c551097e72 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -129,11 +129,10 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { taosArrayPush(createReq.pTags, &field); } - int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); + int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq); void* pHead = rpcMallocCont(tlen); - - void* pBuf = pHead; - tSerializeSMCreateStbReq(&pBuf, &createReq); + tSerializeSMCreateStbReq(pHead, tlen, &createReq); + tFreeSMCreateStbReq(&createReq); *pContLen = tlen; return pHead; } @@ -151,10 +150,9 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam strcpy(field.name, tagname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -173,10 +171,9 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna strcpy(field.name, tagname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -202,10 +199,9 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* strcpy(field2.name, newtagname); taosArrayPush(req.pFields, &field2); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -225,10 +221,9 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char strcpy(field.name, tagname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -247,10 +242,9 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col strcpy(field.name, colname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -269,10 +263,9 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co strcpy(field.name, colname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -292,10 +285,9 @@ void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const c strcpy(field.name, colname); taosArrayPush(req.pFields, &field); - int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); + int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMAlterStbReq(&pBuf, &req); + tSerializeSMAlterStbReq(pHead, contLen, &req); *pContLen = contLen; return pHead; @@ -430,10 +422,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { SMDropStbReq dropReq = {0}; strcpy(dropReq.name, stbname); - int32_t contLen = tSerializeSMDropStbReq(NULL, &dropReq); + int32_t contLen = tSerializeSMDropStbReq(NULL, 0, &dropReq); void* pHead = rpcMallocCont(contLen); - void* pBuf = pHead; - tSerializeSMDropStbReq(&pBuf, &dropReq); + tSerializeSMDropStbReq(pHead, contLen, &dropReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pHead, contLen); ASSERT_NE(pRsp, nullptr); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 2bbf9116c2..0b4867de03 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -404,15 +404,14 @@ char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SP return NULL; } - int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); + int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq); void* pReq = malloc(tlen); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - void* pBuf = pReq; - tSerializeSMCreateStbReq(&pBuf, &createReq); + tSerializeSMCreateStbReq(pReq, tlen, &createReq); *outputLen = tlen; return pReq; } @@ -433,15 +432,14 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pPa assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; - int32_t tlen = tSerializeSMDropStbReq(NULL, &dropReq); + int32_t tlen = tSerializeSMDropStbReq(NULL, 0, &dropReq); void* pReq = malloc(tlen); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - void* pBuf = pReq; - tSerializeSMDropStbReq(&pBuf, &dropReq); + tSerializeSMDropStbReq(pReq, tlen, &dropReq); *outputLen = tlen; return pReq; } From f3c30e33fb4ae9f7f061e7525afd55c1d84938c7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 18:29:00 +0800 Subject: [PATCH 4/5] minor changes --- source/dnode/mnode/impl/src/mndSubscribe.c | 74 +++++++++++----------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2a3e0008a2..6afa1f4c79 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE +#define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" #include "mndDb.h" @@ -54,12 +54,12 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); -static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); -static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); +static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -232,22 +232,22 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { if (epoch != rsp.epoch) { mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); SArray *pTopics = pConsumer->currentTopics; - int sz = taosArrayGetSize(pTopics); + int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); - for (int i = 0; i < sz; i++) { + for (int32_t i = 0; i < sz; i++) { char *topicName = taosArrayGetP(pTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); ASSERT(pSub); - int csz = taosArrayGetSize(pSub->consumers); + int32_t csz = taosArrayGetSize(pSub->consumers); // TODO: change to bsearch - for (int j = 0; j < csz; j++) { + for (int32_t j = 0; j < csz; j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (consumerId == pSubConsumer->consumerId) { - int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); SMqSubTopicEp topicEp; strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); - for (int k = 0; k < vgsz; k++) { + for (int32_t k = 0; k < vgsz; k++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; @@ -276,7 +276,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { - int i = 0; + int32_t i = 0; while (key[i] != ':') { i++; } @@ -317,8 +317,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); if (old == MQ_CONSUMER_STATUS__ACTIVE) { // get all topics of that topic - int sz = taosArrayGetSize(pConsumer->currentTopics); - for (int i = 0; i < sz; i++) { + int32_t sz = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < sz; i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); @@ -334,8 +334,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } else { rebSubs = pConsumer->recentRemovedTopics; } - int sz = taosArrayGetSize(rebSubs); - for (int i = 0; i < sz; i++) { + int32_t sz = taosArrayGetSize(rebSubs); + for (int32_t i = 0; i < sz; i++) { char *topic = taosArrayGetP(rebSubs, i); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); @@ -375,12 +375,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { mInfo("mq rebalance subscription: %s", pSub->key); // remove lost consumer - for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); mInfo("mq remove lost consumer %ld", lostConsumerId); - for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { + for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (pSubConsumer->consumerId == lostConsumerId) { taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); @@ -400,10 +400,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { int32_t imbalanceSolved = 0; // iterate all consumers, set unassignedVgStash - for (int i = 0; i < consumerNum; i++) { + for (int32_t i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; + int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgThisConsumerAfterRb; if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else @@ -442,10 +442,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // assign to vgroup if (taosArrayGetSize(pSub->unassignedVg) != 0) { - for (int i = 0; i < consumerNum; i++) { + for (int32_t i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; + int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgThisConsumerAfterRb; if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else @@ -602,7 +602,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { #if 0 //update consumer status for the subscribption - for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) { SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); int64_t consumerId = pCEp->consumerId; if (pCEp->status != -1) { @@ -619,7 +619,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // TODO: swap with last one, reduce size and reset i taosArrayRemove(pSub->assigned, i); // remove from available consumer - for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { + for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { taosArrayRemove(pSub->availConsumer, j); break; @@ -699,7 +699,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } #endif -static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { +static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); @@ -742,8 +742,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub return 0; } -static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -890,7 +890,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { if (key == NULL) { return NULL; } - int tlen = strlen(cgroup); + int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); key[tlen] = ':'; strcpy(key + tlen + 1, topicName); @@ -931,12 +931,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { char *cgroup = subscribe.consumerGroup; SArray *newSub = subscribe.topicNames; - int newTopicNum = subscribe.topicNum; + int32_t newTopicNum = subscribe.topicNum; taosArraySortString(newSub, taosArrayCompareString); SArray *oldSub = NULL; - int oldTopicNum = 0; + int32_t oldTopicNum = 0; bool createConsumer = false; // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); @@ -960,7 +960,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } - int i = 0, j = 0; + int32_t i = 0, j = 0; while (i < newTopicNum || j < oldTopicNum) { char *newTopicName = NULL; char *oldTopicName = NULL; @@ -975,7 +975,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { newTopicName = taosArrayGetP(newSub, i); oldTopicName = taosArrayGetP(oldSub, j); - int comp = compareLenPrefixedStr(newTopicName, oldTopicName); + int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName); if (comp == 0) { // do nothing oldTopicName = newTopicName = NULL; @@ -997,12 +997,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // cancel subscribe of old topic SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); ASSERT(pSub); - int csz = taosArrayGetSize(pSub->consumers); - for (int ci = 0; ci < csz; ci++) { + int32_t csz = taosArrayGetSize(pSub->consumers); + for (int32_t ci = 0; ci < csz; ci++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); if (pSubConsumer->consumerId == consumerId) { - int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - for (int vgi = 0; vgi < vgsz; vgi++) { + int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + for (int32_t vgi = 0; vgi < vgsz; vgi++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); taosArrayPush(pSub->unassignedVg, pConsumerEp); From 6703798903d0c4ae73453cdf6def36ef628315b2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 18:38:46 +0800 Subject: [PATCH 5/5] minor changes --- source/libs/catalog/src/catalog.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f1c2395de4..b5c7f44f98 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1440,7 +1440,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm SCtgUpdateTblMsg *msg = NULL; STableMetaOutput moutput = {0}; - STableMetaOutput *output = malloc(sizeof(STableMetaOutput)); + STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput)); if (NULL == output) { ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);