Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv
This commit is contained in:
commit
a8bcf12898
|
@ -1127,10 +1127,14 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqTmrMsg {
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SMqTmrMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
} SMqDoRebalanceMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t status;
|
||||
} SMVSubscribeRsp;
|
||||
|
@ -1707,13 +1711,13 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
|
|||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqTbData {
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
int32_t numOfRows;
|
||||
char* colData;
|
||||
} SMqTbData;
|
||||
|
||||
typedef struct SMqTopicBlk {
|
||||
typedef struct {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
int64_t committedOffset;
|
||||
int64_t reqOffset;
|
||||
|
@ -1724,7 +1728,7 @@ typedef struct SMqTopicBlk {
|
|||
SMqTbData* tbData;
|
||||
} SMqTopicData;
|
||||
|
||||
typedef struct SMqConsumeRsp {
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
SSchemaWrapper* schemas;
|
||||
int64_t committedOffset;
|
||||
|
@ -1736,7 +1740,7 @@ typedef struct SMqConsumeRsp {
|
|||
} SMqConsumeRsp;
|
||||
|
||||
// one req for one vg+topic
|
||||
typedef struct SMqConsumeReq {
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
//0: commit only, current offset
|
||||
//1: consume only, poll next offset
|
||||
|
@ -1752,17 +1756,17 @@ typedef struct SMqConsumeReq {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
} SMqConsumeReq;
|
||||
|
||||
typedef struct SMqSubVgEp {
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
SEpSet epSet;
|
||||
} SMqSubVgEp;
|
||||
|
||||
typedef struct SMqSubTopicEp {
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
} SMqSubTopicEp;
|
||||
|
||||
typedef struct SMqCMGetSubEpRsp {
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int64_t epoch;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
|
|
|
@ -141,7 +141,8 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
|
|
@ -27,6 +27,7 @@ typedef struct SMnodeMsg SMnodeMsg;
|
|||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
|
||||
typedef struct SMnodeLoad {
|
||||
|
@ -64,6 +65,7 @@ typedef struct {
|
|||
SMnodeCfg cfg;
|
||||
SDnode *pDnode;
|
||||
PutReqToMWriteQFp putReqToMWriteQFp;
|
||||
PutReqToMReadQFp putReqToMReadQFp;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
SendRedirectRspFp sendRedirectRspFp;
|
||||
|
|
|
@ -209,6 +209,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
|
||||
SName name = {0};
|
||||
char* dbName = getDbOfConnection(tmq->pTscObj);
|
||||
if (dbName == NULL) {
|
||||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
|
||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||
|
||||
|
|
|
@ -565,7 +565,6 @@ TEST(testCase, insert_test) {
|
|||
#endif
|
||||
|
||||
|
||||
#if 0
|
||||
TEST(testCase, projection_query_tables) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
@ -585,7 +584,7 @@ TEST(testCase, projection_query_tables) {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
for(int32_t i = 0; i < 100000; ++i) {
|
||||
for(int32_t i = 0; i < 10000000; ++i) {
|
||||
char sql[512] = {0};
|
||||
sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
|
||||
TAOS_RES* p = taos_query(pConn, sql);
|
||||
|
@ -616,6 +615,7 @@ TEST(testCase, projection_query_tables) {
|
|||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
#if 0
|
||||
|
||||
TEST(testCase, projection_query_stables) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
|
|
@ -46,7 +46,6 @@ TEST(testCase, create_topic_ctb_Test) {
|
|||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
//taos_free_result(pRes);
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
ASSERT_TRUE(pFields == nullptr);
|
||||
|
|
|
@ -256,6 +256,12 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
|
|||
|
||||
static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg* pRpcMsg) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
|
@ -264,6 +270,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
pOption->sendReqToMnodeFp = dndSendReqToMnode;
|
||||
pOption->sendRedirectRspFp = dndSendRedirectRsp;
|
||||
pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
|
||||
pOption->putReqToMReadQFp = dndPutMsgToMReadQ;
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
pOption->cfg.sver = pDnode->env.sver;
|
||||
|
|
|
@ -28,6 +28,8 @@ void mndCleanupConsumer(SMnode *pMnode);
|
|||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
|
|
|
@ -343,42 +343,22 @@ typedef struct {
|
|||
char payload[];
|
||||
} SShowObj;
|
||||
|
||||
#if 0
|
||||
typedef struct SConsumerObj {
|
||||
uint64_t uid;
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
//uint64_t dbUid;
|
||||
int32_t version;
|
||||
SRWLatch lock;
|
||||
SArray* topics;
|
||||
} SConsumerObj;
|
||||
|
||||
typedef struct SMqTopicConsumer {
|
||||
int64_t consumerId;
|
||||
SList* topicList;
|
||||
} SMqTopicConsumer;
|
||||
#endif
|
||||
|
||||
typedef struct SMqConsumerEp {
|
||||
int32_t vgId; // -1 for unassigned
|
||||
int32_t status;
|
||||
SEpSet epSet;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
int64_t lastConsumerHbTs;
|
||||
int64_t lastVgHbTs;
|
||||
char* qmsg;
|
||||
typedef struct {
|
||||
int32_t vgId; // -1 for unassigned
|
||||
int32_t status;
|
||||
SEpSet epSet;
|
||||
int64_t oldConsumerId;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
char* qmsg;
|
||||
} SMqConsumerEp;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pConsumerEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
|
||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastConsumerHbTs);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastVgHbTs);
|
||||
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||
tlen += taosEncodeString(buf, pConsumerEp->qmsg);
|
||||
return tlen;
|
||||
}
|
||||
|
@ -387,10 +367,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
|
|||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
|
||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastConsumerHbTs);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastVgHbTs);
|
||||
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||
buf = taosDecodeString(buf, &pConsumerEp->qmsg);
|
||||
return buf;
|
||||
}
|
||||
|
@ -401,97 +379,89 @@ static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
|
|||
}
|
||||
}
|
||||
|
||||
// unit for rebalance
|
||||
typedef struct SMqSubscribeObj {
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
SArray* vgInfo; // SArray<SMqConsumerEp>
|
||||
} SMqSubConsumer;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsumer* pConsumer) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubConsumer(void** buf, SMqSubConsumer* pConsumer) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumer->vgInfo = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp consumerEp;
|
||||
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
|
||||
taosArrayPush(pConsumer->vgInfo, &consumerEp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) {
|
||||
if (pSubConsumer->vgInfo) {
|
||||
taosArrayDestroyEx(pSubConsumer->vgInfo, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
pSubConsumer->vgInfo = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int32_t epoch;
|
||||
// TODO: replace with priority queue
|
||||
int32_t nextConsumerIdx;
|
||||
SArray* availConsumer; // SArray<int64_t> (consumerId)
|
||||
SArray* assigned; // SArray<SMqConsumerEp>
|
||||
SArray* idleConsumer; // SArray<SMqConsumerEp>
|
||||
SArray* lostConsumer; // SArray<SMqConsumerEp>
|
||||
SArray* unassignedVg; // SArray<SMqConsumerEp>
|
||||
int32_t status;
|
||||
int32_t vgNum;
|
||||
SArray* consumers; // SArray<SMqSubConsumer>
|
||||
SArray* unassignedVg; // SArray<SMqConsumerEp>
|
||||
} SMqSubscribeObj;
|
||||
|
||||
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
||||
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
|
||||
SMqSubscribeObj* pSub = calloc(1, sizeof(SMqSubscribeObj));
|
||||
if (pSub == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pSub->key[0] = 0;
|
||||
pSub->epoch = 0;
|
||||
|
||||
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
|
||||
if (pSub->availConsumer == NULL) {
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->assigned == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->lostConsumer == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->idleConsumer == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer));
|
||||
if (pSub->consumers == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||
if (pSub->unassignedVg == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
taosArrayDestroy(pSub->idleConsumer);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pSub->key[0] = 0;
|
||||
pSub->vgNum = 0;
|
||||
pSub->status = 0;
|
||||
|
||||
return pSub;
|
||||
|
||||
_err:
|
||||
tfree(pSub->unassignedVg);
|
||||
tfree(pSub->consumers);
|
||||
tfree(pSub);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pSub->key);
|
||||
tlen += taosEncodeFixedI32(buf, pSub->epoch);
|
||||
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
||||
tlen += taosEncodeFixedI32(buf, pSub->status);
|
||||
int32_t sz;
|
||||
|
||||
sz = taosArrayGetSize(pSub->availConsumer);
|
||||
sz = taosArrayGetSize(pSub->consumers);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i);
|
||||
tlen += taosEncodeFixedI64(buf, *pConsumerId);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->assigned);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->lostConsumer);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->lostConsumer, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->idleConsumer);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp* pCEp = taosArrayGet(pSub->idleConsumer, i);
|
||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||
SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i);
|
||||
tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pSub->unassignedVg);
|
||||
|
@ -506,68 +476,25 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
|
|||
|
||||
static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
|
||||
buf = taosDecodeStringTo(buf, pSub->key);
|
||||
buf = taosDecodeFixedI32(buf, &pSub->epoch);
|
||||
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
||||
buf = taosDecodeFixedI32(buf, &pSub->status);
|
||||
|
||||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->availConsumer = taosArrayInit(sz, sizeof(int64_t));
|
||||
if (pSub->availConsumer == NULL) {
|
||||
pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
|
||||
if (pSub->consumers == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int64_t consumerId;
|
||||
buf = taosDecodeFixedI64(buf, &consumerId);
|
||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||
SMqSubConsumer subConsumer = {0};
|
||||
buf = tDecodeSMqSubConsumer(buf, &subConsumer);
|
||||
taosArrayPush(pSub->consumers, &subConsumer);
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->assigned = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->assigned == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp cEp = {0};
|
||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->assigned, &cEp);
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->lostConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->lostConsumer == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp cEp = {0};
|
||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->lostConsumer, &cEp);
|
||||
}
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->idleConsumer == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerEp cEp = {0};
|
||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->idleConsumer, &cEp);
|
||||
}
|
||||
|
||||
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
|
||||
if (pSub->unassignedVg == NULL) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
taosArrayDestroy(pSub->idleConsumer);
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -575,176 +502,24 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
|||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
||||
taosArrayPush(pSub->unassignedVg, &cEp);
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
|
||||
if (pSub->availConsumer) {
|
||||
taosArrayDestroy(pSub->availConsumer);
|
||||
pSub->availConsumer = NULL;
|
||||
}
|
||||
if (pSub->assigned) {
|
||||
//taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
pSub->assigned = NULL;
|
||||
if (pSub->consumers) {
|
||||
taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
|
||||
//taosArrayDestroy(pSub->consumers);
|
||||
pSub->consumers = NULL;
|
||||
}
|
||||
|
||||
if (pSub->unassignedVg) {
|
||||
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->unassignedVg);
|
||||
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroy(pSub->unassignedVg);
|
||||
pSub->unassignedVg = NULL;
|
||||
}
|
||||
if (pSub->idleConsumer) {
|
||||
//taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->idleConsumer);
|
||||
pSub->idleConsumer = NULL;
|
||||
}
|
||||
if (pSub->lostConsumer) {
|
||||
//taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
pSub->lostConsumer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct SMqCGroup {
|
||||
char name[TSDB_CONSUMER_GROUP_LEN];
|
||||
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
|
||||
SList* consumerIds; // SList<int64_t>
|
||||
SList* idleVGroups; // SList<int32_t>
|
||||
} SMqCGroup;
|
||||
|
||||
typedef struct SMqTopicObj {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
uint64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
SRWLatch lock;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
// SHashObj *cgroups; // SHashObj<SMqCGroup>
|
||||
// SHashObj *consumers; // SHashObj<SMqConsumerObj>
|
||||
} SMqTopicObj;
|
||||
|
||||
// TODO: add cache and change name to id
|
||||
typedef struct SMqConsumerTopic {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int32_t epoch;
|
||||
// vg assigned to the consumer on the topic
|
||||
SArray* pVgInfo; // SArray<int32_t>
|
||||
} SMqConsumerTopic;
|
||||
|
||||
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
|
||||
SMqSubscribeObj* pSub, int64_t* oldConsumerId) {
|
||||
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
|
||||
if (pCTopic == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
strcpy(pCTopic->name, pTopic->name);
|
||||
pCTopic->epoch = 0;
|
||||
pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t));
|
||||
|
||||
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
|
||||
if (unassignedVgSz > 0) {
|
||||
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
|
||||
*oldConsumerId = pCEp->consumerId;
|
||||
pCEp->consumerId = consumerId;
|
||||
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
|
||||
taosArrayPush(pSub->assigned, pCEp);
|
||||
}
|
||||
return pCTopic;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pConsumerTopic->name);
|
||||
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
|
||||
int32_t sz = 0;
|
||||
if (pConsumerTopic->pVgInfo != NULL) {
|
||||
sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||
}
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||
tlen += taosEncodeFixedI32(buf, *pVgInfo);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) {
|
||||
buf = taosDecodeStringTo(buf, pConsumerTopic->name);
|
||||
buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgInfo;
|
||||
buf = taosDecodeFixedI32(buf, &vgInfo);
|
||||
taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqConsumerObj {
|
||||
int64_t consumerId;
|
||||
int64_t connId;
|
||||
SRWLatch lock;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray* topics; // SArray<SMqConsumerTopic>
|
||||
int64_t epoch;
|
||||
// stat
|
||||
int64_t pollCnt;
|
||||
} SMqConsumerObj;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
|
||||
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
||||
int32_t sz = taosArrayGetSize(pConsumer->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i);
|
||||
tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->connId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
|
||||
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqConsumerTopic cTopic;
|
||||
buf = tDecodeSMqConsumerTopic(buf, &cTopic);
|
||||
taosArrayPush(pConsumer->topics, &cTopic);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMqSubConsumerObj {
|
||||
int64_t consumerUid; // if -1, unassigned
|
||||
SList* vgId; // SList<int32_t>
|
||||
} SMqSubConsumerObj;
|
||||
|
||||
typedef struct SMqSubCGroupObj {
|
||||
char name[TSDB_CONSUMER_GROUP_LEN];
|
||||
SList* consumers; // SList<SMqConsumerObj>
|
||||
} SMqSubCGroupObj;
|
||||
|
||||
typedef struct SMqSubTopicObj {
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createTime;
|
||||
|
@ -757,41 +532,57 @@ typedef struct SMqSubTopicObj {
|
|||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
SList* cgroups; // SList<SMqSubCGroupObj>
|
||||
} SMqSubTopicObj;
|
||||
} SMqTopicObj;
|
||||
|
||||
typedef struct SMqConsumerSubObj {
|
||||
int64_t topicUid;
|
||||
SList* vgIds; // SList<int64_t>
|
||||
} SMqConsumerSubObj;
|
||||
|
||||
typedef struct SMqConsumerHbObj {
|
||||
int64_t consumerId;
|
||||
SList* consumerSubs; // SList<SMqConsumerSubObj>
|
||||
} SMqConsumerHbObj;
|
||||
|
||||
typedef struct SMqVGroupSubObj {
|
||||
int64_t topicUid;
|
||||
SList* consumerIds; // SList<int64_t>
|
||||
} SMqVGroupSubObj;
|
||||
|
||||
typedef struct SMqVGroupHbObj {
|
||||
int64_t vgId;
|
||||
SList* vgSubs; // SList<SMqVGroupSubObj>
|
||||
} SMqVGroupHbObj;
|
||||
|
||||
#if 0
|
||||
typedef struct SCGroupObj {
|
||||
char name[TSDB_TOPIC_NAME_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
uint64_t uid;
|
||||
//uint64_t dbUid;
|
||||
int32_t version;
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int64_t connId;
|
||||
SRWLatch lock;
|
||||
SList* consumerIds;
|
||||
} SCGroupObj;
|
||||
#endif
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray* topics; // SArray<char*>
|
||||
int64_t epoch;
|
||||
// stat
|
||||
int64_t pollCnt;
|
||||
// status
|
||||
int32_t status;
|
||||
// heartbeat from the consumer reset hbStatus to 0
|
||||
// each checkConsumerAlive msg add hbStatus by 1
|
||||
// if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost
|
||||
int32_t hbStatus;
|
||||
} SMqConsumerObj;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
|
||||
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
||||
int32_t sz = taosArrayGetSize(pConsumer->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char* topic = taosArrayGetP(pConsumer->topics, i);
|
||||
tlen += taosEncodeString(buf, topic);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->connId);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
|
||||
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char* topic;
|
||||
buf = taosDecodeString(buf, &topic);
|
||||
taosArrayPush(pConsumer->topics, &topic);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SMnodeMsg {
|
||||
char user[TSDB_USER_LEN];
|
||||
|
|
|
@ -96,6 +96,7 @@ typedef struct SMnode {
|
|||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
SendRedirectRspFp sendRedirectRspFp;
|
||||
PutReqToMWriteQFp putReqToMWriteQFp;
|
||||
PutReqToMReadQFp putReqToMReadQFp;
|
||||
} SMnode;
|
||||
|
||||
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
|
||||
|
|
|
@ -28,9 +28,6 @@ void mndCleanupSubscribe(SMnode *pMnode);
|
|||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
SSdbRaw *mndSubscribeActionEncode(SMqSubscribeObj *pSub);
|
||||
SSdbRow *mndSubscribeActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -53,6 +53,19 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||
|
||||
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
|
||||
SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
pConsumer->epoch = 1;
|
||||
pConsumer->consumerId = consumerId;
|
||||
strcpy(pConsumer->cgroup, cgroup);
|
||||
taosInitRWLatch(&pConsumer->lock);
|
||||
return pConsumer;
|
||||
}
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void* buf = NULL;
|
||||
|
@ -164,148 +177,3 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
|
||||
if (pConsumer == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_MND_INVALID_CONSUMER;
|
||||
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
|
||||
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
|
||||
|
||||
STableMetaRsp *pMeta = rpcMallocCont(contLen);
|
||||
if (pMeta == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
|
||||
pMeta->numOfTags = htonl(pConsumer->numOfTags);
|
||||
pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->tableType = TSDB_SUPER_TABLE;
|
||||
pMeta->update = pDb->cfg.update;
|
||||
pMeta->sversion = htonl(pConsumer->version);
|
||||
pMeta->tuid = htonl(pConsumer->uid);
|
||||
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pMeta->pSchema[i];
|
||||
SSchema *pSrcSchema = &pConsumer->pSchema[i];
|
||||
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->type = pSrcSchema->type;
|
||||
pSchema->colId = htonl(pSrcSchema->colId);
|
||||
pSchema->bytes = htonl(pSrcSchema->bytes);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
pMsg->pCont = pMeta;
|
||||
pMsg->contLen = contLen;
|
||||
|
||||
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfConsumers = 0;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
numOfConsumers++;
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
*pNumOfConsumers = numOfConsumers;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->pSchema;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "columns");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "tags");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htonl(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
#endif
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -76,7 +76,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
|
|||
if (mndIsMaster(pMnode)) {
|
||||
SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg));
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)};
|
||||
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
|
||||
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
|
||||
}
|
||||
|
||||
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
|
||||
|
@ -249,6 +249,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
|
|||
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
|
||||
pMnode->pDnode = pOption->pDnode;
|
||||
pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
|
||||
pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
|
||||
pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
|
||||
pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
|
||||
pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
|
||||
|
|
|
@ -124,7 +124,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
|
|||
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
||||
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
||||
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds);
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds);
|
||||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||
|
|
|
@ -1605,4 +1605,4 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
|
|||
// }
|
||||
|
||||
// return 0;
|
||||
// }
|
||||
// }
|
||||
|
|
|
@ -22,7 +22,7 @@ static void tsdbResetReadFile(SReadH *pReadh);
|
|||
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
|
||||
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
|
||||
int maxPoints, char *buffer, int bufferSize);
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
|
||||
int numOfColIds);
|
||||
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
|
||||
|
||||
|
@ -271,7 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) {
|
||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds) {
|
||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||
int8_t update = pReadh->pRepo->config.update;
|
||||
|
||||
|
@ -548,7 +548,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
|
||||
int numOfColIds) {
|
||||
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
||||
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
|
|
@ -188,4 +188,4 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
|
|||
vmaReset(pVMA);
|
||||
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue