Merge pull request #10264 from taosdata/feature/privilege

delete stb when deleting database
This commit is contained in:
Shengliang Guan 2022-02-15 18:45:35 +08:00 committed by GitHub
commit 355ac1bd4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 295 additions and 168 deletions

View File

@ -272,8 +272,8 @@ typedef struct {
char comment[TSDB_STB_COMMENT_LEN]; char comment[TSDB_STB_COMMENT_LEN];
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq); int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq); int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void tFreeSMCreateStbReq(SMCreateStbReq* pReq); void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct { typedef struct {
@ -281,8 +281,8 @@ typedef struct {
int8_t igNotExists; int8_t igNotExists;
} SMDropStbReq; } SMDropStbReq;
int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq); int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq); int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
@ -291,8 +291,9 @@ typedef struct {
SArray* pFields; SArray* pFields;
} SMAltertbReq; } SMAltertbReq;
int32_t tSerializeSMAlterStbReq(void** buf, SMAltertbReq* pReq); int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void* tDeserializeSMAlterStbReq(void* buf, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq);
typedef struct { typedef struct {
int32_t pid; int32_t pid;

View File

@ -375,132 +375,171 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
return buf; return buf;
} }
int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) { int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) {
int32_t tlen = 0; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
tlen += taosEncodeString(buf, pReq->name); if (tStartEncode(&encoder) < 0) return -1;
tlen += taosEncodeFixedI8(buf, pReq->igExists); if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pReq->numOfColumns); if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pReq->numOfTags); if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfColumns; ++i) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i); SField *pField = taosArrayGet(pReq->pColumns, i);
tlen += taosEncodeFixedI8(buf, pField->type); if (tEncodeI8(&encoder, pField->type) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pField->bytes); if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
tlen += taosEncodeString(buf, pField->name); if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
for (int32_t i = 0; i < pReq->numOfTags; ++i) { for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i); SField *pField = taosArrayGet(pReq->pTags, i);
tlen += taosEncodeFixedI8(buf, pField->type); if (tEncodeI8(&encoder, pField->type) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pField->bytes); if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
tlen += taosEncodeString(buf, pField->name); if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
tlen += taosEncodeString(buf, pReq->comment); if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) { int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name); SCoder decoder = {0};
buf = taosDecodeFixedI8(buf, &pReq->igExists); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
buf = taosDecodeFixedI32(buf, &pReq->numOfColumns);
buf = taosDecodeFixedI32(buf, &pReq->numOfTags); if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL) { if (pReq->pColumns == NULL || pReq->pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
for (int32_t i = 0; i < pReq->numOfColumns; ++i) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField field = {0}; SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type); if (tDecodeI8(&decoder, &field.type) < 0) return -1;
buf = taosDecodeFixedI32(buf, &field.bytes); if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
buf = taosDecodeStringTo(buf, field.name); if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) { if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
} }
for (int32_t i = 0; i < pReq->numOfTags; ++i) { for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0}; SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type); if (tDecodeI8(&decoder, &field.type) < 0) return -1;
buf = taosDecodeFixedI32(buf, &field.bytes); if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
buf = taosDecodeStringTo(buf, field.name); if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) { if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
} }
buf = taosDecodeStringTo(buf, pReq->comment); if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
return buf; tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) { void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy(pReq->pColumns); taosArrayDestroy(pReq->pColumns);
taosArrayDestroy(pReq->pTags); taosArrayDestroy(pReq->pTags);
pReq->pColumns = NULL;
pReq->pTags = NULL;
} }
int32_t tSerializeSMDropStbReq(void **buf, SMDropStbReq *pReq) { int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
int32_t tlen = 0; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
tlen += taosEncodeString(buf, pReq->name); if (tStartEncode(&encoder) < 0) return -1;
tlen += taosEncodeFixedI8(buf, pReq->igNotExists); if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) { int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name); SCoder decoder = {0};
buf = taosDecodeFixedI8(buf, &pReq->igNotExists); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
return buf; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
} }
int32_t tSerializeSMAlterStbReq(void **buf, SMAltertbReq *pReq) { int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
int32_t tlen = 0; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->alterType);
tlen += taosEncodeFixedI32(buf, pReq->numOfFields);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFields; ++i) { for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField *pField = taosArrayGet(pReq->pFields, i); SField *pField = taosArrayGet(pReq->pFields, i);
tlen += taosEncodeFixedU8(buf, pField->type); if (tEncodeI8(&encoder, pField->type) < 0) return -1;
tlen += taosEncodeFixedI32(buf, pField->bytes); if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
tlen += taosEncodeString(buf, pField->name); if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen; return tlen;
} }
void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) { int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAltertbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name); SCoder decoder = {0};
buf = taosDecodeFixedI8(buf, &pReq->alterType); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
buf = taosDecodeFixedI32(buf, &pReq->numOfFields);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1;
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField)); pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
if (pReq->pFields == NULL) { if (pReq->pFields == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
for (int32_t i = 0; i < pReq->numOfFields; ++i) { for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField field = {0}; SField field = {0};
buf = taosDecodeFixedU8(buf, &field.type); if (tDecodeI8(&decoder, &field.type) < 0) return -1;
buf = taosDecodeFixedI32(buf, &field.bytes); if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
buf = taosDecodeStringTo(buf, field.name); if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pFields, &field) == NULL) { if (taosArrayPush(pReq->pFields, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return -1;
} }
} }
return buf; tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
void tFreeSMAltertbReq(SMAltertbReq *pReq) {
taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL;
} }
int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) {

View File

@ -36,6 +36,9 @@ int32_t mndCheckCreateDbAuth(SUserObj *pOperUser);
int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb); int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb); int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -26,6 +26,7 @@ int32_t mndInitStb(SMnode *pMnode);
void mndCleanupStb(SMnode *pMnode); void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen); int32_t *pRspLen);

View File

@ -141,3 +141,29 @@ int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb) {
} }
int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb) { return 0; } int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb) { return 0; }
int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb) {
if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) {
return 0;
}
if (taosHashGet(pOperUser->writeDbs, pDb->name, strlen(pDb->name) + 1) != NULL) {
return 0;
}
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb) {
if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) {
return 0;
}
if (taosHashGet(pOperUser->readDbs, pDb->name, strlen(pDb->name) + 1) != NULL) {
return 0;
}
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}

View File

@ -14,8 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndAuth.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
@ -53,13 +53,14 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup) {
SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = calloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pConsumer->recentRemovedTopics = taosArrayInit(0, sizeof(char*));
pConsumer->recentRemovedTopics = taosArrayInit(1, sizeof(char *));
pConsumer->epoch = 1; pConsumer->epoch = 1;
pConsumer->consumerId = consumerId; pConsumer->consumerId = consumerId;
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT); atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
@ -70,7 +71,8 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
void *buf = NULL;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
@ -105,7 +107,7 @@ CM_ENCODE_OVER:
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;

View File

@ -18,6 +18,7 @@
#include "mndAuth.h" #include "mndAuth.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
@ -722,6 +723,24 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (pStb->dbUid == pDb->uid) {
SSdbRaw *pStbRaw = mndStbActionEncode(pStb);
if (pStbRaw == NULL || mndTransAppendCommitlog(pTrans, pStbRaw) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pStbRaw);
return -1;
}
sdbSetRawStatus(pStbRaw, SDB_STATUS_DROPPED);
}
sdbRelease(pSdb, pStb);
}
return 0; return 0;
} }

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndStb.h" #include "mndStb.h"
#include "mndAuth.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
@ -27,7 +28,6 @@
#define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_VER_NUMBER 1
#define TSDB_STB_RESERVE_SIZE 64 #define TSDB_STB_RESERVE_SIZE 64
static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
@ -69,7 +69,7 @@ int32_t mndInitStb(SMnode *pMnode) {
void mndCleanupStb(SMnode *pMnode) {} void mndCleanupStb(SMnode *pMnode) {}
static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE; int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE;
@ -343,7 +343,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
return -1; return -1;
} }
SField *pField = taosArrayGet(pCreate->pColumns, 0) ; SField *pField = taosArrayGet(pCreate->pColumns, 0);
if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) { if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
@ -549,12 +549,19 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SStbObj *pTopicStb = NULL; SStbObj *pTopicStb = NULL;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER; if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_STB_OVER;
}
mDebug("stb:%s, start to create", createReq.name); mDebug("stb:%s, start to create", createReq.name);
if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER; if (mndCheckCreateStbReq(&createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_STB_OVER;
}
pStb = mndAcquireStb(pMnode, createReq.name); pStb = mndAcquireStb(pMnode, createReq.name);
if (pStb != NULL) { if (pStb != NULL) {
@ -582,6 +589,15 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
goto CREATE_STB_OVER; goto CREATE_STB_OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto CREATE_STB_OVER;
}
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto CREATE_STB_OVER;
}
code = mndCreateStb(pMnode, pReq, &createReq, pDb); code = mndCreateStb(pMnode, pReq, &createReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
@ -593,8 +609,8 @@ CREATE_STB_OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseStb(pMnode, pTopicStb); mndReleaseStb(pMnode, pTopicStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
taosArrayDestroy(createReq.pColumns); mndReleaseUser(pMnode, pUser);
taosArrayDestroy(createReq.pTags); tFreeSMCreateStbReq(&createReq);
return code; return code;
} }
@ -965,7 +981,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL; STrans *pTrans = NULL;
SField *pField0 = taosArrayGet(pAlter->pFields, 0); SField *pField0 = taosArrayGet(pAlter->pFields, 0);
switch (pAlter->alterType) { switch (pAlter->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_TAG:
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields); code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
@ -1020,9 +1036,13 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
int32_t code = -1; int32_t code = -1;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SUserObj *pUser = NULL;
SMAltertbReq alterReq = {0}; SMAltertbReq alterReq = {0};
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) goto ALTER_STB_OVER; if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_STB_OVER;
}
mDebug("stb:%s, start to alter", alterReq.name); mDebug("stb:%s, start to alter", alterReq.name);
if (mndCheckAlterStbReq(&alterReq) != 0) goto ALTER_STB_OVER; if (mndCheckAlterStbReq(&alterReq) != 0) goto ALTER_STB_OVER;
@ -1039,6 +1059,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
goto ALTER_STB_OVER; goto ALTER_STB_OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto ALTER_STB_OVER;
}
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto ALTER_STB_OVER;
}
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb); code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
@ -1049,6 +1078,7 @@ ALTER_STB_OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
taosArrayDestroy(alterReq.pFields); taosArrayDestroy(alterReq.pFields);
return code; return code;
@ -1135,43 +1165,60 @@ DROP_STB_OVER:
} }
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SMDropStbReq dropReq = {0}; SMDropStbReq dropReq = {0};
tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq);
if (tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_STB_OVER;
}
mDebug("stb:%s, start to drop", dropReq.name); mDebug("stb:%s, start to drop", dropReq.name);
SStbObj *pStb = mndAcquireStb(pMnode, dropReq.name); pStb = mndAcquireStb(pMnode, dropReq.name);
if (pStb == NULL) { if (pStb == NULL) {
if (dropReq.igNotExists) { if (dropReq.igNotExists) {
mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name); mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name);
return 0; code = 0;
goto DROP_STB_OVER;
} else { } else {
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); goto DROP_STB_OVER;
return -1;
} }
} }
SDbObj *pDb = mndAcquireDbByStb(pMnode, dropReq.name); pDb = mndAcquireDbByStb(pMnode, dropReq.name);
if (pDb == NULL) { if (pDb == NULL) {
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); goto DROP_STB_OVER;
return -1; }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto DROP_STB_OVER;
}
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto DROP_STB_OVER;
}
code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_STB_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
} }
int32_t code = mndDropStb(pMnode, pReq, pDb, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseUser(pMnode, pUser);
if (code != 0) { return code;
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) { static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {

View File

@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mndSubscribe.h" #include "mndSubscribe.h"
#include "mndConsumer.h" #include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
@ -54,12 +54,12 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp); const SMqConsumerEp *pConsumerEp);
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
@ -232,22 +232,22 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
if (epoch != rsp.epoch) { if (epoch != rsp.epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
int sz = taosArrayGetSize(pTopics); int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topicName = taosArrayGetP(pTopics, i); char *topicName = taosArrayGetP(pTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
ASSERT(pSub); ASSERT(pSub);
int csz = taosArrayGetSize(pSub->consumers); int32_t csz = taosArrayGetSize(pSub->consumers);
// TODO: change to bsearch // TODO: change to bsearch
for (int j = 0; j < csz; j++) { for (int32_t j = 0; j < csz; j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) { if (consumerId == pSubConsumer->consumerId) {
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
SMqSubTopicEp topicEp; SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName); strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
for (int k = 0; k < vgsz; k++) { for (int32_t k = 0; k < vgsz; k++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId};
@ -276,7 +276,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
int i = 0; int32_t i = 0;
while (key[i] != ':') { while (key[i] != ':') {
i++; i++;
} }
@ -317,8 +317,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
if (old == MQ_CONSUMER_STATUS__ACTIVE) { if (old == MQ_CONSUMER_STATUS__ACTIVE) {
// get all topics of that topic // get all topics of that topic
int sz = taosArrayGetSize(pConsumer->currentTopics); int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
@ -334,8 +334,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
} else { } else {
rebSubs = pConsumer->recentRemovedTopics; rebSubs = pConsumer->recentRemovedTopics;
} }
int sz = taosArrayGetSize(rebSubs); int32_t sz = taosArrayGetSize(rebSubs);
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(rebSubs, i); char *topic = taosArrayGetP(rebSubs, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
@ -375,12 +375,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
mInfo("mq rebalance subscription: %s", pSub->key); mInfo("mq rebalance subscription: %s", pSub->key);
// remove lost consumer // remove lost consumer
for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
mInfo("mq remove lost consumer %ld", lostConsumerId); mInfo("mq remove lost consumer %ld", lostConsumerId);
for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (pSubConsumer->consumerId == lostConsumerId) { if (pSubConsumer->consumerId == lostConsumerId) {
taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo);
@ -400,10 +400,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
int32_t imbalanceSolved = 0; int32_t imbalanceSolved = 0;
// iterate all consumers, set unassignedVgStash // iterate all consumers, set unassignedVgStash
for (int i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg) if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1; vgThisConsumerAfterRb = vgEachConsumer + 1;
else else
@ -442,10 +442,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
// assign to vgroup // assign to vgroup
if (taosArrayGetSize(pSub->unassignedVg) != 0) { if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb; int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg) if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1; vgThisConsumerAfterRb = vgEachConsumer + 1;
else else
@ -602,7 +602,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
#if 0 #if 0
//update consumer status for the subscribption //update consumer status for the subscribption
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId; int64_t consumerId = pCEp->consumerId;
if (pCEp->status != -1) { if (pCEp->status != -1) {
@ -619,7 +619,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
// TODO: swap with last one, reduce size and reset i // TODO: swap with last one, reduce size and reset i
taosArrayRemove(pSub->assigned, i); taosArrayRemove(pSub->assigned, i);
// remove from available consumer // remove from available consumer
for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j); taosArrayRemove(pSub->availConsumer, j);
break; break;
@ -699,7 +699,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
} }
#endif #endif
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
@ -742,8 +742,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
return 0; return 0;
} }
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp) { const SMqConsumerEp *pConsumerEp) {
ASSERT(pConsumerEp->oldConsumerId == -1); ASSERT(pConsumerEp->oldConsumerId == -1);
int32_t vgId = pConsumerEp->vgId; int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
@ -890,7 +890,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
if (key == NULL) { if (key == NULL) {
return NULL; return NULL;
} }
int tlen = strlen(cgroup); int32_t tlen = strlen(cgroup);
memcpy(key, cgroup, tlen); memcpy(key, cgroup, tlen);
key[tlen] = ':'; key[tlen] = ':';
strcpy(key + tlen + 1, topicName); strcpy(key + tlen + 1, topicName);
@ -931,12 +931,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *cgroup = subscribe.consumerGroup; char *cgroup = subscribe.consumerGroup;
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
int newTopicNum = subscribe.topicNum; int32_t newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString); taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL; SArray *oldSub = NULL;
int oldTopicNum = 0; int32_t oldTopicNum = 0;
bool createConsumer = false; bool createConsumer = false;
// create consumer if not exist // create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
@ -960,7 +960,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int i = 0, j = 0; int32_t i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) { while (i < newTopicNum || j < oldTopicNum) {
char *newTopicName = NULL; char *newTopicName = NULL;
char *oldTopicName = NULL; char *oldTopicName = NULL;
@ -975,7 +975,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
newTopicName = taosArrayGetP(newSub, i); newTopicName = taosArrayGetP(newSub, i);
oldTopicName = taosArrayGetP(oldSub, j); oldTopicName = taosArrayGetP(oldSub, j);
int comp = compareLenPrefixedStr(newTopicName, oldTopicName); int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) { if (comp == 0) {
// do nothing // do nothing
oldTopicName = newTopicName = NULL; oldTopicName = newTopicName = NULL;
@ -997,12 +997,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// cancel subscribe of old topic // cancel subscribe of old topic
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
ASSERT(pSub); ASSERT(pSub);
int csz = taosArrayGetSize(pSub->consumers); int32_t csz = taosArrayGetSize(pSub->consumers);
for (int ci = 0; ci < csz; ci++) { for (int32_t ci = 0; ci < csz; ci++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) { if (pSubConsumer->consumerId == consumerId) {
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
for (int vgi = 0; vgi < vgsz; vgi++) { for (int32_t vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
taosArrayPush(pSub->unassignedVg, pConsumerEp); taosArrayPush(pSub->unassignedVg, pConsumerEp);

View File

@ -129,11 +129,10 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
taosArrayPush(createReq.pTags, &field); taosArrayPush(createReq.pTags, &field);
} }
int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
void* pHead = rpcMallocCont(tlen); void* pHead = rpcMallocCont(tlen);
tSerializeSMCreateStbReq(pHead, tlen, &createReq);
void* pBuf = pHead; tFreeSMCreateStbReq(&createReq);
tSerializeSMCreateStbReq(&pBuf, &createReq);
*pContLen = tlen; *pContLen = tlen;
return pHead; return pHead;
} }
@ -151,10 +150,9 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
strcpy(field.name, tagname); strcpy(field.name, tagname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -173,10 +171,9 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
strcpy(field.name, tagname); strcpy(field.name, tagname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -202,10 +199,9 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
strcpy(field2.name, newtagname); strcpy(field2.name, newtagname);
taosArrayPush(req.pFields, &field2); taosArrayPush(req.pFields, &field2);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -225,10 +221,9 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
strcpy(field.name, tagname); strcpy(field.name, tagname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -247,10 +242,9 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
strcpy(field.name, colname); strcpy(field.name, colname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -269,10 +263,9 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
strcpy(field.name, colname); strcpy(field.name, colname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -292,10 +285,9 @@ void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const c
strcpy(field.name, colname); strcpy(field.name, colname);
taosArrayPush(req.pFields, &field); taosArrayPush(req.pFields, &field);
int32_t contLen = tSerializeSMAlterStbReq(NULL, &req); int32_t contLen = tSerializeSMAlterStbReq(NULL, 0, &req);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMAlterStbReq(pHead, contLen, &req);
tSerializeSMAlterStbReq(&pBuf, &req);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
@ -430,10 +422,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
SMDropStbReq dropReq = {0}; SMDropStbReq dropReq = {0};
strcpy(dropReq.name, stbname); strcpy(dropReq.name, stbname);
int32_t contLen = tSerializeSMDropStbReq(NULL, &dropReq); int32_t contLen = tSerializeSMDropStbReq(NULL, 0, &dropReq);
void* pHead = rpcMallocCont(contLen); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead; tSerializeSMDropStbReq(pHead, contLen, &dropReq);
tSerializeSMDropStbReq(&pBuf, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pHead, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pHead, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);

View File

@ -1440,7 +1440,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
SCtgUpdateTblMsg *msg = NULL; SCtgUpdateTblMsg *msg = NULL;
STableMetaOutput moutput = {0}; STableMetaOutput moutput = {0};
STableMetaOutput *output = malloc(sizeof(STableMetaOutput)); STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
if (NULL == output) { if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);

View File

@ -404,15 +404,14 @@ char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SP
return NULL; return NULL;
} }
int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
void* pReq = malloc(tlen); void* pReq = malloc(tlen);
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
void* pBuf = pReq; tSerializeSMCreateStbReq(pReq, tlen, &createReq);
tSerializeSMCreateStbReq(&pBuf, &createReq);
*outputLen = tlen; *outputLen = tlen;
return pReq; return pReq;
} }
@ -433,15 +432,14 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pPa
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
int32_t tlen = tSerializeSMDropStbReq(NULL, &dropReq); int32_t tlen = tSerializeSMDropStbReq(NULL, 0, &dropReq);
void* pReq = malloc(tlen); void* pReq = malloc(tlen);
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
void* pBuf = pReq; tSerializeSMDropStbReq(pReq, tlen, &dropReq);
tSerializeSMDropStbReq(&pBuf, &dropReq);
*outputLen = tlen; *outputLen = tlen;
return pReq; return pReq;
} }