add subscribe for sdb
This commit is contained in:
parent
66f610f8df
commit
00ccbb4a3a
|
@ -324,6 +324,30 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqSetCVgReq {
|
||||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqSetCVgReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||
tlen += taosEncodeString(buf, pReq->topicName);
|
||||
tlen += taosEncodeString(buf, pReq->cGroup);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
||||
buf = taosDecodeStringTo(buf, pReq->cGroup);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
char* dbName;
|
||||
|
|
|
@ -113,8 +113,8 @@ typedef enum {
|
|||
SDB_USER = 7,
|
||||
SDB_AUTH = 8,
|
||||
SDB_ACCT = 9,
|
||||
SDB_CONSUMER = 10,
|
||||
SDB_CGROUP = 11,
|
||||
SDB_SUBSCRIBE = 10,
|
||||
SDB_CONSUMER = 11,
|
||||
SDB_TOPIC = 12,
|
||||
SDB_VGROUP = 13,
|
||||
SDB_STB = 14,
|
||||
|
|
|
@ -178,6 +178,7 @@ do { \
|
|||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_CONSUMER_GROUP_LEN 192
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
||||
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
|
||||
|
|
|
@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode);
|
|||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -326,17 +326,156 @@ typedef struct SMqTopicConsumer {
|
|||
#endif
|
||||
|
||||
typedef struct SMqConsumerEp {
|
||||
int32_t vgId;
|
||||
int32_t vgId; // -1 for unassigned
|
||||
SEpSet epset;
|
||||
int64_t consumerId;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
int64_t lastConsumerHbTs;
|
||||
int64_t lastVgHbTs;
|
||||
} SMqConsumerEp;
|
||||
|
||||
typedef struct SMqCgroupTopicPair {
|
||||
char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
return buf;
|
||||
}
|
||||
|
||||
//unit for rebalance
|
||||
typedef struct SMqSubscribeObj {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int32_t epoch;
|
||||
//TODO: replace with priority queue
|
||||
SArray* availConsumer; // SArray<int64_t> (consumerId)
|
||||
SArray* assigned; // SArray<SMqConsumerEp>
|
||||
SArray* unassignedConsumer;
|
||||
SArray* unassignedVg;
|
||||
} SMqCgroupTopicPair;
|
||||
SArray* unassignedConsumer; // SArray<SMqConsumerEp>
|
||||
SArray* unassignedVg; // SArray<SMqConsumerEp>
|
||||
} SMqSubscribeObj;
|
||||
|
||||
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
||||
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
|
||||
pSub->key[0] = 0;
|
||||
pSub->epoch = 0;
|
||||
if (pSub == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
|
||||
if (pSub->availConsumer == NULL) {
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->assigned == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->assigned == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->unassignedConsumer);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->assigned == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->unassignedConsumer);
|
||||
taosArrayDestroy(pSub->unassignedVg);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pSub->key);
|
||||
tlen += taosEncodeFixedI32(buf, pSub->epoch);
|
||||
int32_t sz;
|
||||
|
||||
sz = taosArrayGetSize(pSub->availConsumer);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i);
|
||||
tlen += taosEncodeFixedI64(buf, *pConsumerId);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->assigned);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->unassignedConsumer);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->unassignedVg);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
|
||||
buf = taosDecodeStringTo(buf, pSub->key);
|
||||
buf = taosDecodeFixedI32(buf, &pSub->epoch);
|
||||
|
||||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->assigned = taosArrayInit(sz, sizeof(int64_t));
|
||||
if (pSub->assigned == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int64_t consumerId;
|
||||
buf = taosDecodeFixedI64(buf, &consumerId);
|
||||
taosArrayPush(pSub->assigned, &consumerId);
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->unassignedConsumer == NULL) {
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp cEp;
|
||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->unassignedConsumer, &cEp);
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->unassignedVg == NULL) {
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
taosArrayDestroy(pSub->unassignedConsumer);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp cEp;
|
||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->unassignedVg, &cEp);
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqCGroup {
|
||||
char name[TSDB_CONSUMER_GROUP_LEN];
|
||||
|
@ -358,8 +497,8 @@ typedef struct SMqTopicObj {
|
|||
char *sql;
|
||||
char *logicalPlan;
|
||||
char *physicalPlan;
|
||||
SHashObj *cgroups; // SHashObj<SMqCGroup>
|
||||
SHashObj *consumers; // SHashObj<SMqConsumerObj>
|
||||
//SHashObj *cgroups; // SHashObj<SMqCGroup>
|
||||
//SHashObj *consumers; // SHashObj<SMqConsumerObj>
|
||||
} SMqTopicObj;
|
||||
|
||||
// TODO: add cache and change name to id
|
||||
|
@ -367,18 +506,93 @@ typedef struct SMqConsumerTopic {
|
|||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int32_t epoch;
|
||||
//TODO: replace with something with ep
|
||||
SList *vgroups; // SList<int32_t>
|
||||
//SList *vgroups; // SList<int32_t>
|
||||
//vg assigned to the consumer on the topic
|
||||
SArray *pVgInfo; // SArray<int32_t>
|
||||
} SMqConsumerTopic;
|
||||
|
||||
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
|
||||
if (pCTopic == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
strcpy(pCTopic->name, pTopic->name);
|
||||
pCTopic->epoch = 0;
|
||||
pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t));
|
||||
|
||||
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
|
||||
if (unassignedVgSz > 0) {
|
||||
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
|
||||
pCEp->consumerId = consumerId;
|
||||
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
|
||||
taosArrayPush(pSub->assigned, pCEp);
|
||||
}
|
||||
return pCTopic;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pConsumerTopic->name);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
|
||||
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||
tlen += taosEncodeFixedI32(buf, *pVgInfo);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) {
|
||||
buf = taosDecodeStringTo(buf, pConsumerTopic->name);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgInfo;
|
||||
buf = taosDecodeFixedI32(buf, &vgInfo);
|
||||
taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqConsumerObj {
|
||||
int64_t consumerId;
|
||||
SRWLatch lock;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray *topics; // SArray<SMqConsumerTopic>
|
||||
SHashObj *topicHash; //SHashObj<SMqConsumerTopic>
|
||||
//SHashObj *topicHash; //SHashObj<SMqTopicObj>
|
||||
} SMqConsumerObj;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
||||
int32_t sz = taosArrayGetSize(pConsumer->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i);
|
||||
tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerTopic cTopic;
|
||||
buf = tDecodeSMqConsumerTopic(buf, &cTopic);
|
||||
taosArrayPush(pConsumer->topics, &cTopic);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqSubConsumerObj {
|
||||
int64_t consumerUid; // if -1, unassigned
|
||||
SList *vgId; // SList<int32_t>
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MND_SUBSCRIBE_H_
|
||||
#define _TD_MND_SUBSCRIBE_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitSubscribe(SMnode *pMnode);
|
||||
void mndCleanupSubscribe(SMnode *pMnode);
|
||||
|
||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
SSdbRaw *mndSubscribeActionEncode(SMqSubscribeObj *pSub);
|
||||
SSdbRow *mndSubscribeActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MND_SUBSCRIBE_H_*/
|
|
@ -30,24 +30,14 @@
|
|||
#define MND_CONSUMER_VER_NUMBER 1
|
||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||
static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
||||
|
||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_CONSUMER,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
|
@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
|
||||
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||
|
||||
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen);
|
||||
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) goto CM_ENCODE_OVER;
|
||||
|
||||
void* abuf = buf;
|
||||
tEncodeSMqConsumerObj(&abuf, pConsumer);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
|
||||
|
||||
#if 0
|
||||
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
|
||||
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
|
||||
int32_t len = strlen(pConsumer->cgroup);
|
||||
|
@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
|||
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
CM_ENCODE_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||
|
@ -116,7 +112,7 @@ CM_ENCODE_OVER:
|
|||
return pRaw;
|
||||
}
|
||||
|
||||
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int8_t sver = 0;
|
||||
|
@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
goto CONSUME_DECODE_OVER;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SMqConsumerObj);
|
||||
SSdbRow *pRow = sdbAllocRow(size);
|
||||
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
|
||||
if (pRow == NULL) goto CONSUME_DECODE_OVER;
|
||||
|
||||
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
|
||||
if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER);
|
||||
int32_t len, topicNum;
|
||||
int32_t len;
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER);
|
||||
void* buf = malloc(len);
|
||||
if (buf == NULL) goto CONSUME_DECODE_OVER;
|
||||
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER);
|
||||
|
||||
tDecodeSMqConsumerObj(buf, pConsumer);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
#if 0
|
||||
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
|
||||
for (int i = 0; i < topicNum; i++) {
|
||||
int32_t topicLen;
|
||||
|
@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
int32_t vgSize;
|
||||
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
|
||||
}
|
||||
#endif
|
||||
|
||||
CONSUME_DECODE_OVER:
|
||||
if (terrno != 0) {
|
||||
|
@ -162,8 +168,6 @@ CONSUME_DECODE_OVER:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
|
||||
|
||||
return pRow;
|
||||
}
|
||||
|
||||
|
@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
|
|||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
char *msgStr = pMsg->rpcMsg.pCont;
|
||||
SCMSubscribeReq subscribe;
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
int64_t consumerId = subscribe.consumerId;
|
||||
char *consumerGroup = subscribe.consumerGroup;
|
||||
int32_t cgroupLen = strlen(consumerGroup);
|
||||
|
||||
SArray *newSub = NULL;
|
||||
int newTopicNum = subscribe.topicNum;
|
||||
if (newTopicNum) {
|
||||
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
||||
}
|
||||
SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
|
||||
if (pConsumerTopics == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
for (int i = 0; i < newTopicNum; i++) {
|
||||
char *newTopicName = taosArrayGetP(newSub, i);
|
||||
SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
|
||||
|
||||
strcpy(pConsumerTopic->name, newTopicName);
|
||||
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
|
||||
}
|
||||
|
||||
taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
|
||||
free(pConsumerTopics);
|
||||
taosArraySortString(newSub, taosArrayCompareString);
|
||||
|
||||
SArray *oldSub = NULL;
|
||||
int oldTopicNum = 0;
|
||||
// create consumer if not exist
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
// create consumer
|
||||
pConsumer = malloc(sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pConsumer->consumerId = consumerId;
|
||||
strcpy(pConsumer->cgroup, consumerGroup);
|
||||
|
||||
} else {
|
||||
oldSub = pConsumer->topics;
|
||||
oldTopicNum = taosArrayGetSize(oldSub);
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
//TODO: free memory
|
||||
return -1;
|
||||
}
|
||||
|
||||
int i = 0, j = 0;
|
||||
while (i < newTopicNum || j < oldTopicNum) {
|
||||
SMqConsumerTopic *pOldTopic = NULL;
|
||||
SMqConsumerTopic *pNewTopic = NULL;
|
||||
if (i >= newTopicNum) {
|
||||
// encode unset topic msg to all vnodes related to that topic
|
||||
pOldTopic = taosArrayGet(oldSub, j);
|
||||
j++;
|
||||
} else if (j >= oldTopicNum) {
|
||||
pNewTopic = taosArrayGet(newSub, i);
|
||||
i++;
|
||||
} else {
|
||||
pNewTopic = taosArrayGet(newSub, i);
|
||||
pOldTopic = taosArrayGet(oldSub, j);
|
||||
|
||||
char *newName = pNewTopic->name;
|
||||
char *oldName = pOldTopic->name;
|
||||
int comp = compareLenPrefixedStr(newName, oldName);
|
||||
if (comp == 0) {
|
||||
// do nothing
|
||||
pOldTopic = pNewTopic = NULL;
|
||||
i++;
|
||||
j++;
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
pOldTopic = NULL;
|
||||
i++;
|
||||
} else {
|
||||
pNewTopic = NULL;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
if (pOldTopic != NULL) {
|
||||
//cancel subscribe of that old topic
|
||||
ASSERT(pNewTopic == NULL);
|
||||
char *oldTopicName = pOldTopic->name;
|
||||
SList *vgroups = pOldTopic->vgroups;
|
||||
SListIter iter;
|
||||
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
|
||||
SListNode *pn;
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
|
||||
ASSERT(pTopic != NULL);
|
||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
while ((pn = tdListNext(&iter)) != NULL) {
|
||||
int32_t vgId = *(int64_t *)pn->data;
|
||||
// acquire and get epset
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
// TODO what time to release?
|
||||
if (pVgObj == NULL) {
|
||||
// TODO handle error
|
||||
continue;
|
||||
}
|
||||
//build reset msg
|
||||
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
|
||||
// TODO:serialize
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = pMqVgSetReq;
|
||||
action.contLen = 0; // TODO
|
||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMqVgSetReq);
|
||||
mndTransDrop(pTrans);
|
||||
// TODO free
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
//delete data in mnode
|
||||
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
} else if (pNewTopic != NULL) {
|
||||
// save subscribe info to mnode
|
||||
ASSERT(pOldTopic == NULL);
|
||||
|
||||
char *newTopicName = pNewTopic->name;
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
||||
ASSERT(pTopic != NULL);
|
||||
|
||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
if (pGroup == NULL) {
|
||||
// add new group
|
||||
pGroup = malloc(sizeof(SMqCGroup));
|
||||
if (pGroup == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pGroup->consumerIds = tdListNew(sizeof(int64_t));
|
||||
if (pGroup->consumerIds == NULL) {
|
||||
free(pGroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pGroup->status = 0;
|
||||
// add into cgroups
|
||||
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
|
||||
}
|
||||
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
|
||||
|
||||
// put the consumer into list
|
||||
// rebalance will be triggered by timer
|
||||
tdListAppend(pGroup->consumerIds, &consumerId);
|
||||
|
||||
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
|
||||
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
|
||||
// TODO: error handling
|
||||
mndTransAppendRedolog(pTrans, pTopicRaw);
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
// destroy old sub
|
||||
taosArrayDestroy(oldSub);
|
||||
// put new sub into consumerobj
|
||||
pConsumer->topics = newSub;
|
||||
|
||||
// persist consumerObj
|
||||
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
|
||||
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||
// TODO: error handling
|
||||
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO: free memory
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
|
||||
#if 0
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
|
@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
|||
pMsg->contLen = contLen;
|
||||
|
||||
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
|
|||
}
|
||||
|
||||
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
|
||||
#if 0
|
||||
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
|
|||
pRsp->body = buf;
|
||||
pRsp->bodyLen = tlen;
|
||||
return pRsp;
|
||||
#endif
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
||||
|
|
|
@ -0,0 +1,597 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tcompare.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_SUBSCRIBE_VER_NUMBER 1
|
||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
||||
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
|
||||
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
||||
|
||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndSubActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndSubActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndSubActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
|
||||
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
|
||||
SMqConsumerEp CEp;
|
||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||
int32_t sz;
|
||||
SVgObj *pVgroup = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
|
||||
while (pIter != NULL) {
|
||||
if (pVgroup->dbUid == pTopic->dbUid) {
|
||||
CEp.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
CEp.vgId = pVgroup->vgId;
|
||||
taosArrayPush(unassignedVg, &CEp);
|
||||
}
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic) {
|
||||
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
SMqSetCVgReq req = {
|
||||
.vgId = vgId,
|
||||
.consumerId = pConsumer->consumerId,
|
||||
};
|
||||
strcpy(req.cGroup, pConsumer->cgroup);
|
||||
strcpy(req.topicName, pConsumerTopic->name);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||
void *reqStr = malloc(tlen);
|
||||
if (reqStr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
void *abuf = reqStr;
|
||||
tEncodeSMqSetCVgReq(abuf, &req);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = reqStr;
|
||||
action.contLen = tlen;
|
||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(reqStr);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndCleanupSubscribe(SMnode *pMnode) {}
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
||||
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
||||
int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
||||
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
||||
|
||||
void *buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
goto SUB_ENCODE_OVER;
|
||||
}
|
||||
void *abuf = buf;
|
||||
|
||||
tEncodeSubscribeObj(&buf, pSub);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
|
||||
|
||||
SUB_ENCODE_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
||||
|
||||
if (sver != MND_SUBSCRIBE_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto SUB_DECODE_OVER;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SMqSubscribeObj);
|
||||
SSdbRow *pRow = sdbAllocRow(size);
|
||||
if (pRow == NULL) goto SUB_DECODE_OVER;
|
||||
|
||||
SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
|
||||
if (pSub == NULL) goto SUB_DECODE_OVER;
|
||||
|
||||
int32_t dataPos = 0;
|
||||
int32_t tlen;
|
||||
void *buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto SUB_DECODE_OVER;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||
|
||||
if (tDecodeSubscribeObj(buf, pSub) == NULL) {
|
||||
goto SUB_DECODE_OVER;
|
||||
}
|
||||
|
||||
SUB_DECODE_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
|
||||
// TODO free subscribeobj
|
||||
tfree(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pRow;
|
||||
}
|
||||
|
||||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
|
||||
mTrace("subscribe:%s, perform insert action", pSub->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
|
||||
mTrace("subscribe:%s, perform delete action", pSub->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
|
||||
mTrace("subscribe:%s, perform update action", pOldSub->key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static char *mndMakeSubscribeKey(char *cgroup, char *topicName) {
|
||||
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
|
||||
if (key == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
int tlen = strlen(cgroup);
|
||||
memcpy(key, cgroup, tlen);
|
||||
key[tlen] = ':';
|
||||
strcpy(key + tlen + 1, topicName);
|
||||
return key;
|
||||
}
|
||||
|
||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
char *key = mndMakeSubscribeKey(cgroup, topicName);
|
||||
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||
free(key);
|
||||
if (pSub == NULL) {
|
||||
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
|
||||
}
|
||||
return pSub;
|
||||
}
|
||||
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pSub);
|
||||
}
|
||||
|
||||
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
char *msgStr = pMsg->rpcMsg.pCont;
|
||||
SCMSubscribeReq subscribe;
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
int64_t consumerId = subscribe.consumerId;
|
||||
char *consumerGroup = subscribe.consumerGroup;
|
||||
int32_t cgroupLen = strlen(consumerGroup);
|
||||
|
||||
SArray *newSub = subscribe.topicNames;
|
||||
int newTopicNum = subscribe.topicNum;
|
||||
|
||||
taosArraySortString(newSub, taosArrayCompareString);
|
||||
|
||||
SArray *oldSub = NULL;
|
||||
int oldTopicNum = 0;
|
||||
// create consumer if not exist
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
// create consumer
|
||||
pConsumer = malloc(sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pConsumer->consumerId = consumerId;
|
||||
strcpy(pConsumer->cgroup, consumerGroup);
|
||||
taosInitRWLatch(&pConsumer->lock);
|
||||
} else {
|
||||
oldSub = pConsumer->topics;
|
||||
}
|
||||
pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
||||
|
||||
if (oldSub != NULL) {
|
||||
oldTopicNum = taosArrayGetSize(oldSub);
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
// TODO: free memory
|
||||
return -1;
|
||||
}
|
||||
|
||||
int i = 0, j = 0;
|
||||
while (i < newTopicNum || j < oldTopicNum) {
|
||||
char* newTopicName = NULL;
|
||||
char* oldTopicName = NULL;
|
||||
if (i >= newTopicNum) {
|
||||
// encode unset topic msg to all vnodes related to that topic
|
||||
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
|
||||
j++;
|
||||
} else if (j >= oldTopicNum) {
|
||||
newTopicName = taosArrayGet(newSub, i);
|
||||
i++;
|
||||
} else {
|
||||
newTopicName = taosArrayGet(newSub, i);
|
||||
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
|
||||
|
||||
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
|
||||
if (comp == 0) {
|
||||
// do nothing
|
||||
oldTopicName = newTopicName = NULL;
|
||||
i++;
|
||||
j++;
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
oldTopicName = NULL;
|
||||
i++;
|
||||
} else {
|
||||
newTopicName = NULL;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldTopicName != NULL) {
|
||||
#if 0
|
||||
// cancel subscribe of that old topic
|
||||
ASSERT(pNewTopic == NULL);
|
||||
char *oldTopicName = pOldTopic->name;
|
||||
SList *vgroups = pOldTopic->vgroups;
|
||||
SListIter iter;
|
||||
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
|
||||
SListNode *pn;
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
|
||||
ASSERT(pTopic != NULL);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName);
|
||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
while ((pn = tdListNext(&iter)) != NULL) {
|
||||
int32_t vgId = *(int64_t *)pn->data;
|
||||
// acquire and get epset
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
// TODO what time to release?
|
||||
if (pVgObj == NULL) {
|
||||
// TODO handle error
|
||||
continue;
|
||||
}
|
||||
// build reset msg
|
||||
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
|
||||
// TODO:serialize
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = pMqVgSetReq;
|
||||
action.contLen = 0; // TODO
|
||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMqVgSetReq);
|
||||
mndTransDrop(pTrans);
|
||||
// TODO free
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// delete data in mnode
|
||||
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
#endif
|
||||
} else if (newTopicName != NULL) {
|
||||
// save subscribe info to mnode
|
||||
ASSERT(oldTopicName == NULL);
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
||||
if (pTopic == NULL) {
|
||||
/*terrno = */
|
||||
continue;
|
||||
}
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
|
||||
if (pSub == NULL) {
|
||||
pSub = tNewSubscribeObj();
|
||||
if (pSub == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
// set unassigned vg
|
||||
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
||||
}
|
||||
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
||||
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
||||
|
||||
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
|
||||
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
||||
// send setmsg to vnode
|
||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pConsumerTopic->pVgInfo);
|
||||
free(pConsumerTopic);
|
||||
#if 0
|
||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||
if (pGroup == NULL) {
|
||||
// add new group
|
||||
pGroup = malloc(sizeof(SMqCGroup));
|
||||
if (pGroup == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pGroup->consumerIds = tdListNew(sizeof(int64_t));
|
||||
if (pGroup->consumerIds == NULL) {
|
||||
free(pGroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pGroup->status = 0;
|
||||
// add into cgroups
|
||||
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
|
||||
}
|
||||
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
|
||||
|
||||
// put the consumer into list
|
||||
// rebalance will be triggered by timer
|
||||
tdListAppend(pGroup->consumerIds, &consumerId);
|
||||
|
||||
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
|
||||
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
|
||||
// TODO: error handling
|
||||
mndTransAppendRedolog(pTrans, pTopicRaw);
|
||||
|
||||
#endif
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
}
|
||||
// part3. persist consumerObj
|
||||
|
||||
// destroy old sub
|
||||
if (oldSub) taosArrayDestroy(oldSub);
|
||||
// put new sub into consumerobj
|
||||
|
||||
// persist consumerObj
|
||||
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
|
||||
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||
// TODO: error handling
|
||||
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO: free memory
|
||||
if (newSub) taosArrayDestroy(newSub);
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);
|
||||
|
||||
#if 0
|
||||
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
|
||||
if (pConsumer == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
|
||||
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
|
||||
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
|
||||
|
||||
STableMetaRsp *pMeta = rpcMallocCont(contLen);
|
||||
if (pMeta == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
|
||||
pMeta->numOfTags = htonl(pConsumer->numOfTags);
|
||||
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->tableType = TSDB_SUPER_TABLE;
|
||||
pMeta->update = pDb->cfg.update;
|
||||
pMeta->sversion = htonl(pConsumer->version);
|
||||
pMeta->tuid = htonl(pConsumer->uid);
|
||||
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pMeta->pSchema[i];
|
||||
SSchema *pSrcSchema = &pConsumer->pSchema[i];
|
||||
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->type = pSrcSchema->type;
|
||||
pSchema->colId = htonl(pSrcSchema->colId);
|
||||
pSchema->bytes = htonl(pSrcSchema->bytes);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
pMsg->pCont = pMeta;
|
||||
pMsg->contLen = contLen;
|
||||
|
||||
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfConsumers = 0;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
numOfConsumers++;
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
*pNumOfConsumers = numOfConsumers;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->pSchema;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "columns");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "tags");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
|
@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
||||
pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char));
|
||||
if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER;
|
||||
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
|
@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
TOPIC_ENCODE_OVER:
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||
/*if (pTopic->logicalPlan) {*/
|
||||
/*free(pTopic->logicalPlan);*/
|
||||
/*}*/
|
||||
/*if (pTopic->physicalPlan) {*/
|
||||
/*free(pTopic->physicalPlan);*/
|
||||
/*}*/
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
|
||||
pTopic->physicalPlan = calloc(len + 1, sizeof(char));
|
||||
if (pTopic->physicalPlan == NULL) {
|
||||
free(pTopic->logicalPlan);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
}
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) {
|
|||
return "auth";
|
||||
case SDB_ACCT:
|
||||
return "acct";
|
||||
case SDB_SUBSCRIBE:
|
||||
return "subscribe";
|
||||
case SDB_CONSUMER:
|
||||
return "consumer";
|
||||
case SDB_CGROUP:
|
||||
return "cgroup";
|
||||
case SDB_TOPIC:
|
||||
return "topic";
|
||||
case SDB_VGROUP:
|
||||
|
|
Loading…
Reference in New Issue