fix:[TD-31017]process return value in mnode for tmq
This commit is contained in:
parent
acafca8841
commit
862a40afa0
|
@ -2828,19 +2828,24 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
|||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
||||
static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||
buf = taosDecodeStringTo(buf, pReq->clientId);
|
||||
|
||||
int32_t topicNum;
|
||||
int32_t topicNum = 0;
|
||||
buf = taosDecodeFixedI32(buf, &topicNum);
|
||||
|
||||
pReq->topicNames = taosArrayInit(topicNum, sizeof(void*));
|
||||
if (pReq->topicNames == NULL){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; i < topicNum; i++) {
|
||||
char* name;
|
||||
char* name = NULL;
|
||||
buf = taosDecodeString(buf, &name);
|
||||
taosArrayPush(pReq->topicNames, &name);
|
||||
if (taosArrayPush(pReq->topicNames, &name) == NULL){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI8(buf, &pReq->withTbName);
|
||||
|
@ -2849,7 +2854,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
|||
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
|
||||
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
|
||||
return buf;
|
||||
return 0;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -556,7 +556,8 @@ typedef struct {
|
|||
char name[TSDB_LOG_VAR_LEN];
|
||||
} SLogVar;
|
||||
|
||||
#define TMQ_SEPARATOR ':'
|
||||
#define TMQ_SEPARATOR ":"
|
||||
#define TMQ_SEPARATOR_CHAR ':'
|
||||
|
||||
enum {
|
||||
SND_WORKER_TYPE__SHARED = 1,
|
||||
|
|
|
@ -456,11 +456,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
|||
pOffset.consumerId = tmq->consumerId;
|
||||
pOffset.offset.val = *offset;
|
||||
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
|
||||
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName);
|
||||
|
||||
(void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
|
||||
|
@ -1667,11 +1663,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
}
|
||||
|
||||
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
||||
pReq->subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
|
||||
|
||||
(void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
|
||||
pReq->withTbName = tmq->withTbName;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->timeout = timeout;
|
||||
|
@ -2902,11 +2894,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
|||
SMqVgOffset pOffset = {0};
|
||||
|
||||
pOffset.consumerId = tmq->consumerId;
|
||||
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pOffset.offset.subKey, tmq->groupId, groupLen);
|
||||
pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pOffset.offset.subKey + groupLen + 1, tname);
|
||||
(void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
|
||||
|
||||
int32_t len = 0;
|
||||
tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
|
||||
|
|
|
@ -9197,23 +9197,23 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
|||
|
||||
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
|
||||
snprintf(buf, maxLen, "none");
|
||||
(void)snprintf(buf, maxLen, "none");
|
||||
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||
snprintf(buf, maxLen, "earliest");
|
||||
(void)snprintf(buf, maxLen, "earliest");
|
||||
} else if (pVal->type == TMQ_OFFSET__RESET_LATEST) {
|
||||
snprintf(buf, maxLen, "latest");
|
||||
(void)snprintf(buf, maxLen, "latest");
|
||||
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
||||
snprintf(buf, maxLen, "wal:%" PRId64, pVal->version);
|
||||
(void)snprintf(buf, maxLen, "wal:%" PRId64, pVal->version);
|
||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) {
|
||||
char *tmp = taosMemoryCalloc(1, pVal->primaryKey.nData + 1);
|
||||
if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData);
|
||||
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts,
|
||||
(void)memcpy(tmp, pVal->primaryKey.pData, pVal->primaryKey.nData);
|
||||
(void)snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%s", pVal->uid, pVal->ts,
|
||||
pVal->primaryKey.type, tmp);
|
||||
taosMemoryFree(tmp);
|
||||
} else {
|
||||
snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%" PRId64, pVal->uid, pVal->ts,
|
||||
(void)snprintf(buf, maxLen, "tsdb:%" PRId64 "|%" PRId64 ",pk type:%d,val:%" PRId64, pVal->uid, pVal->ts,
|
||||
pVal->primaryKey.type, pVal->primaryKey.val);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -30,10 +30,10 @@ enum {
|
|||
|
||||
int32_t mndInitConsumer(SMnode *pMnode);
|
||||
void mndCleanupConsumer(SMnode *pMnode);
|
||||
void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info);
|
||||
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info);
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -43,6 +43,22 @@ int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer);
|
|||
|
||||
const char *mndConsumerStatusName(int status);
|
||||
|
||||
#define MND_TMQ_NULL_CHECK(c) \
|
||||
do { \
|
||||
if (c == NULL) { \
|
||||
code = TSDB_CODE_OUT_OF_MEMORY; \
|
||||
goto END; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define MND_TMQ_RETURN_CHECK(c) \
|
||||
do { \
|
||||
code = c; \
|
||||
if (code != 0) { \
|
||||
goto END; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -622,7 +622,8 @@ typedef struct {
|
|||
int32_t resetOffsetCfg;
|
||||
} SMqConsumerObj;
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe);
|
||||
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||
char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer);
|
||||
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
|
||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||
|
@ -665,8 +666,8 @@ typedef struct {
|
|||
char* qmsg; // SubPlanToString
|
||||
} SMqSubscribeObj;
|
||||
|
||||
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
|
||||
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
|
||||
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub);
|
||||
int32_t tCloneSubscribeObj(const SMqSubscribeObj* pSub, SMqSubscribeObj **ppSub);
|
||||
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
|
||||
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
|
||||
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
|
||||
|
|
|
@ -26,12 +26,9 @@ int32_t mndInitSubscribe(SMnode *pMnode);
|
|||
void mndCleanupSubscribe(SMnode *pMnode);
|
||||
|
||||
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName);
|
||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
|
||||
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
|
||||
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||
|
||||
|
|
|
@ -25,13 +25,12 @@ extern "C" {
|
|||
int32_t mndInitTopic(SMnode *pMnode);
|
||||
void mndCleanupTopic(SMnode *pMnode);
|
||||
|
||||
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName);
|
||||
int32_t mndAcquireTopic(SMnode *pMnode, const char *topicName, SMqTopicObj **pTopic);
|
||||
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
|
||||
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb);
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
||||
void mndTopicGetShowName(const char* fullTopic, char* topic);
|
||||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -40,7 +40,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
|||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||
|
||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -57,7 +56,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
||||
|
@ -68,13 +66,10 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||
|
||||
void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
|
||||
void *msg = rpcMallocCont(sizeof(int64_t));
|
||||
if (msg == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId,
|
||||
(int32_t)sizeof(int64_t));
|
||||
return;
|
||||
}
|
||||
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
|
||||
int32_t code = 0;
|
||||
void *msg = rpcMallocCont(sizeof(int64_t));
|
||||
MND_TMQ_NULL_CHECK(msg);
|
||||
|
||||
*(int64_t*)msg = consumerId;
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -85,108 +80,51 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR
|
|||
};
|
||||
|
||||
mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
|
||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (code != 0){
|
||||
mError("consumer:%"PRId64" send consumer msg:%d error:%d", consumerId, msgType, code);
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
|
||||
return code;
|
||||
|
||||
END:
|
||||
taosMemoryFree(msg);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser,
|
||||
bool enableReplay) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
||||
pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||
if (pTopic == NULL) { // terrno has been set by callee function
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto FAILED;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic), &lino, FAILED);
|
||||
|
||||
TAOS_CHECK_GOTO(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION), &lino, FAILED);
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
|
||||
MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
|
||||
|
||||
if (enableReplay) {
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
|
||||
goto FAILED;
|
||||
goto END;
|
||||
} else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
|
||||
if (pDb == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto FAILED;
|
||||
goto END;
|
||||
}
|
||||
if (pDb->cfg.numOfVgroups != 1) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
|
||||
goto FAILED;
|
||||
goto END;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
return 0;
|
||||
FAILED:
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
|
||||
pConsumer->status, mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL);
|
||||
if (pConsumerNew == NULL){
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false), &lino, END);
|
||||
|
||||
TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), &lino, END);
|
||||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
END:
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(code);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||
|
@ -196,55 +134,39 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
|||
SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " failed to be found to clear it", pClearMsg->consumerId);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
|
||||
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL);
|
||||
if (pConsumerNew == NULL){
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
// this is the drop action, not the update action
|
||||
TAOS_CHECK_GOTO(mndSetConsumerDropLogs(pTrans, pConsumerNew), &lino, END);
|
||||
|
||||
MND_TMQ_NULL_CHECK(pTrans);
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
|
||||
END:
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(code);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
|
||||
int32_t code = 0;
|
||||
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
|
||||
if (rsp->topicPrivileges == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) { // terrno has been set by callee function
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
code = mndAcquireTopic(pMnode, topic, &pTopic);
|
||||
if (code != TDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
||||
strcpy(data->topic, topic);
|
||||
MND_TMQ_NULL_CHECK(data);
|
||||
(void)strcpy(data->topic, topic);
|
||||
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
|
||||
grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
|
||||
data->noPrivilege = 1;
|
||||
|
@ -253,22 +175,30 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRs
|
|||
}
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
return 0;
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
|
||||
for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
|
||||
TopicOffsetRows *data = taosArrayGet(req->topics, i);
|
||||
if (data == NULL){
|
||||
continue;
|
||||
}
|
||||
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||
if (pSub == NULL) {
|
||||
SMqSubscribeObj *pSub = NULL;
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
|
||||
(void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
|
||||
int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
|
||||
if (code != 0) {
|
||||
mError("failed to acquire subscribe by key:%s, code:%d", key, code);
|
||||
continue;
|
||||
}
|
||||
taosWLockLatch(&pSub->lock);
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
if (pConsumerEp) {
|
||||
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||
(void)taosArrayDestroy(pConsumerEp->offsetRows);
|
||||
pConsumerEp->offsetRows = data->offsetRows;
|
||||
data->offsetRows = NULL;
|
||||
}
|
||||
|
@ -303,36 +233,24 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
SMqHbReq req = {0};
|
||||
SMqHbRsp rsp = {0};
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
TAOS_CHECK_GOTO(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req), NULL, end);
|
||||
|
||||
MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
|
||||
int64_t consumerId = req.consumerId;
|
||||
pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto end;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user), NULL, end);
|
||||
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
|
||||
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
|
||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
|
||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
|
||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
|
||||
}
|
||||
|
||||
storeOffsetRows(pMnode, &req, pConsumer);
|
||||
code = buildMqHbRsp(pMsg, &rsp);
|
||||
|
||||
end:
|
||||
END:
|
||||
tDestroySMqHbRsp(&rsp);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDestroySMqHbReq(&req);
|
||||
TAOS_RETURN(code);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
|
||||
|
@ -349,19 +267,22 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
|
|||
// handle all topics subscribed by this consumer
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
// txn guarantees pSub is created
|
||||
if (pSub == NULL) {
|
||||
SMqSubscribeObj *pSub = NULL;
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
|
||||
(void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
|
||||
int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
|
||||
if (code != 0) {
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
SMqSubTopicEp topicEp = {0};
|
||||
strcpy(topicEp.topic, topic);
|
||||
(void)strcpy(topicEp.topic, topic);
|
||||
|
||||
// 2.1 fetch topic schema
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
code = mndAcquireTopic(pMnode, topic, &pTopic);
|
||||
if (code != TDB_CODE_SUCCESS) {
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
continue;
|
||||
|
@ -371,16 +292,27 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
|
|||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
if (topicEp.schema.nCols) {
|
||||
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
|
||||
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
|
||||
if (topicEp.schema.pSchema == NULL) {
|
||||
taosRUnLockLatch(&pTopic->lock);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
|
||||
}
|
||||
taosRUnLockLatch(&pTopic->lock);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
// this customer assigned vgroups
|
||||
if (pConsumerEp == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
||||
if (topicEp.vgs == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
|
@ -391,9 +323,9 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
|
|||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
// char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
// mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
|
||||
|
||||
if (pVgEp == NULL) {
|
||||
continue;
|
||||
}
|
||||
if (epoch == -1) {
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (pVgroup) {
|
||||
|
@ -401,17 +333,10 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
|
|||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
}
|
||||
// 2.2.1 build vg ep
|
||||
SMqSubVgEp vgEp = {
|
||||
.epSet = pVgEp->epSet,
|
||||
.vgId = pVgEp->vgId,
|
||||
.offset = -1,
|
||||
};
|
||||
|
||||
taosArrayPush(topicEp.vgs, &vgEp);
|
||||
SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
|
||||
(void)taosArrayPush(topicEp.vgs, &vgEp);
|
||||
}
|
||||
taosArrayPush(rsp->topics, &topicEp);
|
||||
|
||||
(void)taosArrayPush(rsp->topics, &topicEp);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
@ -437,12 +362,16 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc
|
|||
pHead->walever = 0;
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqAskEpRsp(&abuf, rsp);
|
||||
code = tEncodeSMqAskEpRsp(&abuf, rsp);
|
||||
if (code != 0) {
|
||||
rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
// send rsp
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
TAOS_RETURN(code);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||
|
@ -450,33 +379,22 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
SMqAskEpReq req = {0};
|
||||
SMqAskEpRsp rsp = {0};
|
||||
int32_t code = 0;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
TAOS_CHECK_RETURN(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
|
||||
|
||||
MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
|
||||
int64_t consumerId = req.consumerId;
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
|
||||
return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
|
||||
if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
|
||||
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
|
||||
pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto END;
|
||||
}
|
||||
|
||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||
|
||||
// 1. check consumer status
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
|
||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
|
||||
}
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS_READY) {
|
||||
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
|
@ -490,10 +408,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
if (epoch != serverEpoch) {
|
||||
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
|
||||
consumerId, epoch, serverEpoch);
|
||||
code = addEpSetInfo(pMnode, pConsumer, epoch, &rsp);
|
||||
if(code != 0){
|
||||
goto END;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
|
||||
}
|
||||
|
||||
code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
|
||||
|
@ -507,27 +422,29 @@ END:
|
|||
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
MND_TMQ_NULL_CHECK(pCommitRaw);
|
||||
code = mndTransAppendCommitlog(pTrans, pCommitRaw);
|
||||
if (code != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
goto END;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
|
||||
TAOS_RETURN(code);
|
||||
MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
MND_TMQ_NULL_CHECK(pCommitRaw);
|
||||
code = mndTransAppendCommitlog(pTrans, pCommitRaw);
|
||||
if (code != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
goto END;
|
||||
}
|
||||
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
|
||||
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
TAOS_RETURN(code);
|
||||
MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void freeItem(void *param) {
|
||||
|
@ -538,29 +455,35 @@ static void freeItem(void *param) {
|
|||
}
|
||||
|
||||
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
|
||||
int32_t code = 0;
|
||||
pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
|
||||
pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumerNew->rebNewTopics == NULL || pConsumerNew->rebRemovedTopics == NULL){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
|
||||
int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < oldTopicNum || j < newTopicNum) {
|
||||
if (i >= oldTopicNum) {
|
||||
char *newTopicCopy = taosStrdup(taosArrayGetP(pConsumerNew->assignedTopics, j));
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
|
||||
MND_TMQ_NULL_CHECK(tmp);
|
||||
char *newTopicCopy = taosStrdup(tmp);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy));
|
||||
j++;
|
||||
continue;
|
||||
} else if (j >= newTopicNum) {
|
||||
char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||
MND_TMQ_NULL_CHECK(tmp);
|
||||
char *oldTopicCopy = taosStrdup(tmp);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy));
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||
MND_TMQ_NULL_CHECK(oldTopic);
|
||||
char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
|
||||
MND_TMQ_NULL_CHECK(newTopic);
|
||||
int comp = strcmp(oldTopic, newTopic);
|
||||
if (comp == 0) {
|
||||
i++;
|
||||
|
@ -568,22 +491,22 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
|||
continue;
|
||||
} else if (comp < 0) {
|
||||
char *oldTopicCopy = taosStrdup(oldTopic);
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy));
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *newTopicCopy = taosStrdup(newTopic);
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy));
|
||||
j++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
|
||||
int32_t code = 0;
|
||||
taosArraySort(pTopicList, taosArrayCompareString);
|
||||
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
||||
|
||||
|
@ -594,24 +517,21 @@ static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
|
|||
return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
|
||||
}
|
||||
}
|
||||
TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){
|
||||
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
|
||||
int64_t consumerId = subscribe->consumerId;
|
||||
char *cgroup = subscribe->cgroup;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
SMqConsumerObj *pExistedConsumer = NULL;
|
||||
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pExistedConsumer == NULL) {
|
||||
int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
|
||||
if (code != 0) {
|
||||
mInfo("receive subscribe request from new consumer:0x%" PRIx64
|
||||
",cgroup:%s, numOfTopics:%d", consumerId,
|
||||
subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe);
|
||||
if (pConsumerNew == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
|
||||
} else {
|
||||
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
||||
|
||||
|
@ -621,82 +541,53 @@ static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscri
|
|||
(int32_t)taosArrayGetSize(subscribe->topicNames));
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS_READY) {
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto _over;
|
||||
}
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe);
|
||||
if (pConsumerNew == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
int32_t code = getTopicAddDelete(pExistedConsumer, pConsumerNew);
|
||||
if (code != 0){
|
||||
terrno = code;
|
||||
goto _over;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto END;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
|
||||
}
|
||||
mndReleaseConsumer(pMnode, pExistedConsumer);
|
||||
return pConsumerNew;
|
||||
if (ppConsumer){
|
||||
*ppConsumer = pConsumerNew;
|
||||
}
|
||||
return code;
|
||||
|
||||
_over:
|
||||
END:
|
||||
mndReleaseConsumer(pMnode, pExistedConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe));
|
||||
if(taosArrayGetSize(subscribe.topicNames) == 0){
|
||||
SMqConsumerObj *pConsumerTmp = mndAcquireConsumer(pMnode, subscribe.consumerId);
|
||||
if(pConsumerTmp == NULL){
|
||||
goto _over;
|
||||
}
|
||||
SMqConsumerObj *pConsumerTmp = NULL;
|
||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
||||
mndReleaseConsumer(pMnode, pConsumerTmp);
|
||||
}
|
||||
|
||||
code = checkAndSortTopic(pMnode, subscribe.topicNames);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
goto _over;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _over;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(pTrans);
|
||||
|
||||
code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pConsumerNew = buildSubConsumer(pMnode, &subscribe);
|
||||
if(pConsumerNew == NULL){
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), NULL, _over);
|
||||
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _over);
|
||||
MND_TMQ_RETURN_CHECK(validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay));
|
||||
MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_over:
|
||||
END:
|
||||
mndTransDrop(pTrans);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||
TAOS_RETURN(code);
|
||||
return code;
|
||||
}
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
|
@ -838,8 +729,7 @@ static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char
|
|||
int32_t size = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
|
||||
if (strcmp(topic, pTopic) == 0) {
|
||||
if (topic && strcmp(topic, pTopic) == 0) {
|
||||
existing = true;
|
||||
break;
|
||||
}
|
||||
|
@ -865,32 +755,45 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
|
||||
taosArrayPush(pOldConsumer->rebNewTopics, &topic);
|
||||
void * tmp = taosArrayGetP(pOldConsumer->assignedTopics, i);
|
||||
if (tmp == NULL){
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
char *topic = taosStrdup(tmp);
|
||||
if (taosArrayPush(pOldConsumer->rebNewTopics, &topic) == NULL) {
|
||||
taosMemoryFree(topic);
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
||||
mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
|
||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||
mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
|
||||
} else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
|
||||
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||
void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
|
||||
if (tmp == NULL){
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
char *pNewTopic = taosStrdup(tmp);
|
||||
removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
|
||||
bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
|
||||
if (existing) {
|
||||
mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
|
||||
taosMemoryFree(pNewTopic);
|
||||
} else {
|
||||
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
|
||||
if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
|
||||
taosMemoryFree(pNewTopic);
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
|
||||
}
|
||||
|
||||
int32_t status = pOldConsumer->status;
|
||||
updateConsumerStatus(pOldConsumer);
|
||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
|
||||
mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
|
||||
", current topics:%d, newTopics:%d, removeTopics:%d",
|
||||
|
@ -901,13 +804,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
|
||||
char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
||||
if (topic == NULL){
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
|
||||
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
|
||||
|
||||
int32_t status = pOldConsumer->status;
|
||||
updateConsumerStatus(pOldConsumer);
|
||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
|
||||
mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
|
||||
", current topics:%d, newTopics:%d, removeTopics:%d",
|
||||
|
@ -921,13 +827,13 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
return 0;
|
||||
}
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
|
||||
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
*pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
||||
if (*pConsumer == NULL) {
|
||||
return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
}
|
||||
return pConsumer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
|
||||
|
@ -940,6 +846,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||
|
@ -964,34 +871,37 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
}
|
||||
|
||||
if (numOfRows + topicSz > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + topicSz);
|
||||
MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < topicSz; i++) {
|
||||
SColumnInfoData *pColInfo;
|
||||
SColumnInfoData *pColInfo = NULL;
|
||||
int32_t cols = 0;
|
||||
|
||||
// consumer id
|
||||
char consumerIdHex[32] = {0};
|
||||
sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
|
||||
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
|
||||
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
|
||||
|
||||
// consumer group
|
||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(cgroup, pConsumer->cgroup);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
|
||||
|
||||
// client id
|
||||
char clientId[256 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(clientId, pConsumer->clientId);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
|
||||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -999,46 +909,48 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
STR_TO_VARSTR(status, pStatusName);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
|
||||
|
||||
// one subscribed topic
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
if (hasTopic) {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
const char *topicName = mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i));
|
||||
STR_TO_VARSTR(topic, topicName);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
||||
mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
|
||||
*(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
|
||||
} else {
|
||||
colDataSetVal(pColInfo, numOfRows, NULL, true);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
|
||||
}
|
||||
|
||||
// end point
|
||||
/*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/
|
||||
/*colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/
|
||||
|
||||
// up time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
|
||||
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
||||
MND_TMQ_RETURN_CHECK(tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal));
|
||||
|
||||
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
||||
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
||||
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
@ -1051,6 +963,9 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
||||
|
|
|
@ -1525,9 +1525,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
|||
|
||||
if (mndSetDropDbPrepareLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
#ifdef TD_ENTERPRISE
|
||||
if (mndDropViewByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
|
|
|
@ -255,15 +255,17 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
|
|||
|
||||
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) {
|
||||
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||
char *topic, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer) {
|
||||
int32_t code = 0;
|
||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pConsumer->consumerId = consumerId;
|
||||
memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
|
||||
(void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
|
||||
|
||||
pConsumer->epoch = 0;
|
||||
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
||||
|
@ -276,20 +278,26 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
|
|||
if (updateType == CONSUMER_ADD_REB){
|
||||
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumer->rebNewTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
char* topicTmp = taosStrdup(topic);
|
||||
taosArrayPush(pConsumer->rebNewTopics, &topicTmp);
|
||||
if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
}else if (updateType == CONSUMER_REMOVE_REB) {
|
||||
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumer->rebRemovedTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
char* topicTmp = taosStrdup(topic);
|
||||
taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp);
|
||||
if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
}else if (updateType == CONSUMER_INSERT_SUB){
|
||||
tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
|
||||
pConsumer->withTbName = subscribe->withTbName;
|
||||
|
@ -297,10 +305,9 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
|
|||
pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
|
||||
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
||||
|
||||
|
||||
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
||||
if (pConsumer->rebNewTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
pConsumer->assignedTopics = subscribe->topicNames;
|
||||
|
@ -310,11 +317,12 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
|
|||
subscribe->topicNames = NULL;
|
||||
}
|
||||
|
||||
return pConsumer;
|
||||
*ppConsumer = pConsumer;
|
||||
return 0;
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumer);
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
|
||||
|
@ -530,27 +538,36 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqSubscribeObj *tNewSubscribeObj(const char *key) {
|
||||
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
|
||||
int32_t code = 0;
|
||||
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
|
||||
if (pSubObj == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(pSubObj);
|
||||
|
||||
memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
(void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
taosInitRWLatch(&pSubObj->lock);
|
||||
pSubObj->vgNum = 0;
|
||||
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
|
||||
pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
|
||||
return pSubObj;
|
||||
MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
|
||||
if (ppSub){
|
||||
*ppSub = pSubObj;
|
||||
}
|
||||
return code;
|
||||
|
||||
END:
|
||||
taosMemoryFree(pSubObj);
|
||||
return code;
|
||||
}
|
||||
|
||||
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||
int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
|
||||
int32_t code = 0;
|
||||
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
|
||||
if (pSubNew == NULL) return NULL;
|
||||
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
if (pSubNew == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
(void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
taosInitRWLatch(&pSubNew->lock);
|
||||
|
||||
pSubNew->dbUid = pSub->dbUid;
|
||||
|
@ -560,8 +577,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
|
||||
pSubNew->vgNum = pSub->vgNum;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
while (1) {
|
||||
|
@ -576,9 +592,11 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
}
|
||||
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
|
||||
pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
|
||||
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||
(void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
|
||||
pSubNew->qmsg = taosStrdup(pSub->qmsg);
|
||||
return pSubNew;
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||
|
|
|
@ -759,25 +759,27 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
SVgObj* pVgroup = NULL;
|
||||
SQueryPlan* pPlan = NULL;
|
||||
SSubplan* pSubplan = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||
if (pPlan == NULL) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
} else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
|
||||
SNode* pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
code = nodesStringToNode(pTopic->ast, &pAst);
|
||||
if (code != 0) {
|
||||
mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
||||
if (code != 0) {
|
||||
mError("failed to create topic:%s since %s", pTopic->name, terrstr());
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
@ -785,18 +787,19 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
if (pPlan) {
|
||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||
if (levelNum != 1) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
|
||||
if (pNodeListNode == NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
|
||||
if (opNum != 1) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
||||
|
@ -817,12 +820,18 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
pSub->vgNum++;
|
||||
|
||||
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
if (pVgEp == NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
pVgEp->vgId = pVgroup->vgId;
|
||||
taosArrayPush(pSub->unassignedVgs, &pVgEp);
|
||||
|
||||
if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pVgEp);
|
||||
goto END;
|
||||
}
|
||||
mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
|
@ -830,14 +839,14 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
int32_t msgLen;
|
||||
|
||||
if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
|
||||
qDestroyQueryPlan(pPlan);
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
goto END;
|
||||
}
|
||||
} else {
|
||||
pSub->qmsg = taosStrdup("");
|
||||
}
|
||||
|
||||
END:
|
||||
qDestroyQueryPlan(pPlan);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -44,8 +44,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
|
|||
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
int32_t mndInitTopic(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_TOPIC,
|
||||
|
@ -70,9 +68,16 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupTopic(SMnode *pMnode) {}
|
||||
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
||||
//
|
||||
return strchr(topic, '.') + 1;
|
||||
void mndTopicGetShowName(const char* fullTopic, char* topic) {
|
||||
if (fullTopic == NULL) {
|
||||
return;
|
||||
}
|
||||
char* tmp = strchr(fullTopic, '.');
|
||||
if (tmp == NULL) {
|
||||
tstrncpy(topic, fullTopic, TSDB_TOPIC_FNAME_LEN);
|
||||
}else {
|
||||
tstrncpy(topic, tmp+1, TSDB_TOPIC_FNAME_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
|
@ -289,25 +294,25 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
|
|||
taosMemoryFreeClear(pTopic->ast);
|
||||
taosMemoryFreeClear(pTopic->physicalPlan);
|
||||
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||
taosArrayDestroy(pTopic->ntbColIds);
|
||||
(void)taosArrayDestroy(pTopic->ntbColIds);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
|
||||
mTrace("topic:%s perform update action", pOldTopic->name);
|
||||
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
(void)atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
(void)atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName) {
|
||||
int32_t mndAcquireTopic(SMnode *pMnode, const char *topicName, SMqTopicObj **pTopic) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
|
||||
if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
*pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
|
||||
if (*pTopic == NULL) {
|
||||
return TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
}
|
||||
return pTopic;
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
|
||||
|
@ -316,25 +321,23 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC;
|
||||
|
||||
if (pCreate->sql == NULL) return -1;
|
||||
if (pCreate->sql == NULL) return TSDB_CODE_MND_INVALID_TOPIC;
|
||||
|
||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pCreate->ast == NULL || pCreate->ast[0] == 0) return -1;
|
||||
if (pCreate->ast == NULL || pCreate->ast[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
|
||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (pCreate->subStbName[0] == 0) return -1;
|
||||
if (pCreate->subStbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
|
||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
|
||||
if (pCreate->subDbName[0] == 0) return -1;
|
||||
if (pCreate->subDbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
int32_t code = 0;
|
||||
MND_TMQ_RETURN_CHECK(nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList));
|
||||
int64_t suid = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->suid;
|
||||
int8_t tableType = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->tableType;
|
||||
if (tableType == TSDB_CHILD_TABLE) {
|
||||
|
@ -345,17 +348,19 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
|||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
if (pCol->tableType == TSDB_NORMAL_TABLE) {
|
||||
pTopic->ntbUid = pCol->tableId;
|
||||
taosArrayPush(pTopic->ntbColIds, &pCol->colId);
|
||||
MND_TMQ_NULL_CHECK(taosArrayPush(pTopic->ntbColIds, &pCol->colId));
|
||||
}
|
||||
}
|
||||
}
|
||||
nodesDestroyList(pNodeList);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj *topicObj){
|
||||
STqCheckInfo info;
|
||||
memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN);
|
||||
STqCheckInfo info = {0};
|
||||
(void)memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN);
|
||||
info.ntbUid = topicObj->ntbUid;
|
||||
info.colIdList = topicObj->ntbColIds;
|
||||
// broadcast forbid alter info
|
||||
|
@ -363,7 +368,7 @@ static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
int32_t code = 0;
|
||||
void *buf = NULL;
|
||||
void *buf = NULL;
|
||||
|
||||
while (1) {
|
||||
// iterate vg
|
||||
|
@ -378,16 +383,17 @@ static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj
|
|||
int32_t len;
|
||||
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||
MND_TMQ_NULL_CHECK(buf);
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
code = tEncodeSTqCheckInfo(&encoder, &info);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tEncoderClear(&encoder);
|
||||
goto END;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
@ -398,10 +404,7 @@ static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj
|
|||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + len;
|
||||
action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO;
|
||||
code = mndTransAppendRedoAction(pTrans, &action);
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
buf = NULL;
|
||||
}
|
||||
|
@ -417,33 +420,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
const char *userName) {
|
||||
mInfo("start to create topic:%s", pCreate->name);
|
||||
STrans *pTrans = NULL;
|
||||
int32_t code = -1;
|
||||
int32_t code = 0;
|
||||
SNode *pAst = NULL;
|
||||
SQueryPlan *pPlan = NULL;
|
||||
SMqTopicObj topicObj = {0};
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic");
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
code = -1;
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
MND_TMQ_NULL_CHECK(pTrans);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
code = mndTransCheckConflict(pMnode, pTrans);
|
||||
if (code != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
|
||||
|
||||
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj);
|
||||
if (code != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj));
|
||||
|
||||
topicObj.createTime = taosGetTimestampMs();
|
||||
topicObj.updateTime = topicObj.createTime;
|
||||
|
@ -458,56 +450,25 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
topicObj.ast = taosStrdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
|
||||
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||
|
||||
code = nodesStringToNode(pCreate->ast, &pAst);
|
||||
if (code != 0) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(nodesStringToNode(pCreate->ast, &pAst));
|
||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
||||
if (code != 0) {
|
||||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(qCreateQueryPlan(&cxt, &pPlan, NULL));
|
||||
|
||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||
if (topicObj.ntbColIds == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
extractTopicTbInfo(pAst, &topicObj);
|
||||
|
||||
MND_TMQ_NULL_CHECK(topicObj.ntbColIds);
|
||||
MND_TMQ_RETURN_CHECK(extractTopicTbInfo(pAst, &topicObj));
|
||||
if (topicObj.ntbUid == 0) {
|
||||
taosArrayDestroy(topicObj.ntbColIds);
|
||||
(void)taosArrayDestroy(topicObj.ntbColIds);
|
||||
topicObj.ntbColIds = NULL;
|
||||
}
|
||||
|
||||
code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema);
|
||||
if (code != 0) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL);
|
||||
if (code != 0) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema));
|
||||
MND_TMQ_RETURN_CHECK(nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL));
|
||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
||||
if (pStb == NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
code = terrno;
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
strcpy(topicObj.stbName, pCreate->subStbName);
|
||||
MND_TMQ_NULL_CHECK(pStb);
|
||||
tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
|
||||
topicObj.stbUid = pStb->uid;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
if(pCreate->ast != NULL){
|
||||
|
@ -518,34 +479,25 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
}
|
||||
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
code = -1;
|
||||
goto _OUT;
|
||||
MND_TMQ_NULL_CHECK(pCommitRaw);
|
||||
code = mndTransAppendCommitlog(pTrans, pCommitRaw);
|
||||
if(code != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
goto END;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
|
||||
if (topicObj.ntbUid != 0) {
|
||||
code = sendCheckInfoToVnode(pTrans, pMnode, &topicObj);
|
||||
if (code != 0){
|
||||
goto _OUT;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(sendCheckInfoToVnode(pTrans, pMnode, &topicObj));
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
code = -1;
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OUT:
|
||||
END:
|
||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||
taosMemoryFreeClear(topicObj.sql);
|
||||
taosMemoryFreeClear(topicObj.ast);
|
||||
taosArrayDestroy(topicObj.ntbColIds);
|
||||
(void)taosArrayDestroy(topicObj.ntbColIds);
|
||||
if (topicObj.schema.nCols) {
|
||||
taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||
}
|
||||
|
@ -557,75 +509,64 @@ _OUT:
|
|||
|
||||
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
int32_t code = TDB_CODE_SUCCESS;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SCMCreateTopicReq createTopicReq = {0};
|
||||
|
||||
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto END;
|
||||
}
|
||||
|
||||
mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||
|
||||
if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
|
||||
mError("topic:%s failed to create since %s", createTopicReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndCheckCreateTopicReq(&createTopicReq));
|
||||
|
||||
pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
||||
if (pTopic != NULL) {
|
||||
code = mndAcquireTopic(pMnode, createTopicReq.name, &pTopic);
|
||||
if (code == TDB_CODE_SUCCESS) {
|
||||
if (createTopicReq.igExists) {
|
||||
mInfo("topic:%s already exist, ignore exist is set", createTopicReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
goto END;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
code = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||
goto END;
|
||||
}
|
||||
} else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
|
||||
goto _OVER;
|
||||
} else if (code != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(pDb);
|
||||
|
||||
if (pDb->cfg.walRetentionPeriod == 0) {
|
||||
terrno = TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO;
|
||||
code = TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO;
|
||||
mError("db:%s, not allowed to create topic when WAL_RETENTION_PERIOD is zero", pDb->name);
|
||||
goto _OVER;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){
|
||||
terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
|
||||
code = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
|
||||
mError("topic num out of range");
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) {
|
||||
goto _OVER;
|
||||
goto END;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
|
||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
||||
if (code == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
SName dbname = {0};
|
||||
tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB);
|
||||
{
|
||||
SName dbname = {0};
|
||||
(void)tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB); // ignore error
|
||||
SName topicName = {0};
|
||||
(void)tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // ignore error
|
||||
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
|
||||
createTopicReq.sql, strlen(createTopicReq.sql));
|
||||
}
|
||||
|
||||
SName topicName = {0};
|
||||
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
//reuse this function for topic
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
|
||||
createTopicReq.sql, strlen(createTopicReq.sql));
|
||||
|
||||
_OVER:
|
||||
END:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to create topic:%s since %s", createTopicReq.name, terrstr());
|
||||
}
|
||||
|
@ -638,22 +579,20 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
|
||||
int32_t code = -1;
|
||||
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) {
|
||||
goto _OVER;
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = NULL;
|
||||
MND_TMQ_RETURN_CHECK(mndUserRemoveTopic(pMnode, pTrans, pTopic->name));
|
||||
pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
MND_TMQ_NULL_CHECK(pCommitRaw);
|
||||
code = mndTransAppendCommitlog(pTrans, pCommitRaw);
|
||||
if(code != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
goto END;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
|
||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -661,7 +600,7 @@ static bool checkTopic(SArray *topics, char *topicName){
|
|||
int32_t sz = taosArrayGetSize(topics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(topics, i);
|
||||
if (strcmp(name, topicName) == 0) {
|
||||
if (name && strcmp(name, topicName) == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -682,29 +621,26 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi
|
|||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||
code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
continue;
|
||||
}
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto end;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
goto end;
|
||||
goto END;
|
||||
}
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
end:
|
||||
END:
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
|
@ -726,13 +662,10 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO
|
|||
}
|
||||
|
||||
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
|
||||
if (buf == NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(buf);
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
(void)memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
@ -742,12 +675,12 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO
|
|||
code = mndTransAppendRedoAction(pTrans, &action);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
goto end;
|
||||
goto END;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
end:
|
||||
END:
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
|
@ -761,70 +694,40 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
STrans *pTrans = NULL;
|
||||
|
||||
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
pTopic = mndAcquireTopic(pMnode, dropReq.name);
|
||||
if (pTopic == NULL) {
|
||||
code = mndAcquireTopic(pMnode, dropReq.name, &pTopic);
|
||||
if (code != 0) {
|
||||
if (dropReq.igNotExists) {
|
||||
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
|
||||
tFreeSMDropTopicReq(&dropReq);
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
tFreeSMDropTopicReq(&dropReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(pTrans);
|
||||
|
||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||
code = mndTransCheckConflict(pMnode, pTrans);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
|
||||
if (code < 0) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
goto end;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
|
||||
MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name));
|
||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
|
||||
|
||||
if (pTopic->ntbUid != 0) {
|
||||
code = mndDropCheckInfoByTopic(pMnode, pTrans, pTopic);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
MND_TMQ_RETURN_CHECK(mndDropCheckInfoByTopic(pMnode, pTrans, pTopic));
|
||||
}
|
||||
|
||||
code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||
|
||||
end:
|
||||
END:
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
mndTransDrop(pTrans);
|
||||
if (code != 0) {
|
||||
|
@ -834,8 +737,7 @@ end:
|
|||
}
|
||||
|
||||
SName name = {0};
|
||||
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
(void)tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // ignore error
|
||||
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
|
||||
|
||||
tFreeSMDropTopicReq(&dropReq);
|
||||
|
@ -849,8 +751,7 @@ int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
return -1;
|
||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
int32_t numOfTopics = 0;
|
||||
|
@ -875,18 +776,24 @@ int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
|||
}
|
||||
|
||||
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson){
|
||||
char* string = NULL;
|
||||
char* string = NULL;
|
||||
int32_t code = 0;
|
||||
cJSON* columns = cJSON_CreateArray();
|
||||
if (columns == NULL) {
|
||||
return;
|
||||
}
|
||||
MND_TMQ_NULL_CHECK(columns);
|
||||
for (int i = 0; i < nCols; i++) {
|
||||
cJSON* column = cJSON_CreateObject();
|
||||
MND_TMQ_NULL_CHECK(column);
|
||||
SSchema* s = schema + i;
|
||||
cJSON* cname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(column, "name", cname);
|
||||
MND_TMQ_NULL_CHECK(cname);
|
||||
if (!cJSON_AddItemToObject(column, "name", cname)) {
|
||||
return;
|
||||
}
|
||||
cJSON* ctype = cJSON_CreateString(tDataTypes[s->type].name);
|
||||
cJSON_AddItemToObject(column, "type", ctype);
|
||||
MND_TMQ_NULL_CHECK(ctype);
|
||||
if (!cJSON_AddItemToObject(column, "type", ctype)) {
|
||||
return;
|
||||
}
|
||||
int32_t length = 0;
|
||||
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
s->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
|
@ -897,8 +804,13 @@ static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson){
|
|||
length = s->bytes;
|
||||
}
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(column, "length", cbytes);
|
||||
cJSON_AddItemToArray(columns, column);
|
||||
MND_TMQ_NULL_CHECK(cbytes);
|
||||
if (!cJSON_AddItemToObject(column, "length", cbytes)){
|
||||
return;
|
||||
}
|
||||
if (!cJSON_AddItemToArray(columns, column)){
|
||||
return;
|
||||
}
|
||||
}
|
||||
string = cJSON_PrintUnformatted(columns);
|
||||
cJSON_Delete(columns);
|
||||
|
@ -910,6 +822,9 @@ static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson){
|
|||
mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
|
||||
}
|
||||
taosMemoryFree(string);
|
||||
|
||||
END:
|
||||
return;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
|
@ -917,13 +832,16 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = 0;
|
||||
char *sql = NULL;
|
||||
char *schemaJson = NULL;
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
SName n;
|
||||
SColumnInfoData *pColInfo= NULL;
|
||||
SName n = {0};
|
||||
int32_t cols = 0;
|
||||
|
||||
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
|
||||
|
@ -931,28 +849,34 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
STR_TO_VARSTR(topicName, pName);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)topicName, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topicName, false));
|
||||
|
||||
char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&n, varDataVal(dbName));
|
||||
MND_TMQ_RETURN_CHECK(tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB));
|
||||
MND_TMQ_RETURN_CHECK(tNameGetDbName(&n, varDataVal(dbName)));
|
||||
varDataSetLen(dbName, strlen(varDataVal(dbName)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)dbName, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)dbName, false));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false));
|
||||
|
||||
char *sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
|
||||
sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
|
||||
MND_TMQ_NULL_CHECK(sql);
|
||||
STR_TO_VARSTR(sql, pTopic->sql);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)sql, false));
|
||||
|
||||
taosMemoryFree(sql);
|
||||
taosMemoryFreeClear(sql);
|
||||
|
||||
char *schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
|
||||
schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
|
||||
MND_TMQ_NULL_CHECK(schemaJson);
|
||||
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
|
||||
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
|
||||
|
@ -969,8 +893,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)schemaJson, false);
|
||||
taosMemoryFree(schemaJson);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)schemaJson, false));
|
||||
taosMemoryFreeClear(schemaJson);
|
||||
|
||||
char mete[4 + VARSTR_HEADER_SIZE] = {0};
|
||||
if(pTopic->withMeta){
|
||||
|
@ -980,7 +905,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)mete, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)mete, false));
|
||||
|
||||
char type[8 + VARSTR_HEADER_SIZE] = {0};
|
||||
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
|
@ -992,7 +918,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)type, false);
|
||||
MND_TMQ_NULL_CHECK(pColInfo);
|
||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)type, false));
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pTopic);
|
||||
|
@ -1000,24 +927,11 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
END:
|
||||
taosMemoryFreeClear(schemaJson);
|
||||
taosMemoryFreeClear(sql);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
|
||||
|
@ -1047,33 +961,3 @@ bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pTopic->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1357,7 +1357,7 @@ SHashObj *mndDupUseDbHash(SHashObj *pOld) {
|
|||
}
|
||||
|
||||
int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
|
||||
memcpy(pNew, pUser, sizeof(SUserObj));
|
||||
(void)memcpy(pNew, pUser, sizeof(SUserObj));
|
||||
pNew->authVersion++;
|
||||
pNew->updateTime = taosGetTimestampMs();
|
||||
|
||||
|
@ -1377,7 +1377,7 @@ int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
|
|||
taosRUnLockLatch(&pUser->lock);
|
||||
|
||||
if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) {
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -1983,10 +1983,11 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
|
||||
if (ALTER_USER_ADD_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
|
||||
int32_t len = strlen(pAlterReq->objname) + 1;
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pAlterReq->objname);
|
||||
if (pTopic == NULL) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = mndAcquireTopic(pMnode, pAlterReq->objname, &pTopic);
|
||||
if (code != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
@ -1994,10 +1995,11 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
|
|||
|
||||
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
|
||||
int32_t len = strlen(pAlterReq->objname) + 1;
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pAlterReq->objname);
|
||||
if (pTopic == NULL) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = mndAcquireTopic(pMnode, pAlterReq->objname, &pTopic);
|
||||
if (code != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
|
|
@ -2725,15 +2725,6 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
|||
SDbObj dbObj = {0};
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
|
||||
// int32_t numOfTopics = 0;
|
||||
// if (mndGetNumOfTopics(pMnode, pDb->name, &numOfTopics) != 0) {
|
||||
// goto _OVER;
|
||||
// }
|
||||
// if (numOfTopics > 0) {
|
||||
// terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
||||
// goto _OVER;
|
||||
// }
|
||||
|
||||
int32_t numOfStreams = 0;
|
||||
if (mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams) != 0) {
|
||||
goto _OVER;
|
||||
|
|
Loading…
Reference in New Issue