feat: tmq support show
This commit is contained in:
parent
5efa53e11f
commit
a5a10f7068
|
@ -18,7 +18,6 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "osSleep.h"
|
|
||||||
|
|
||||||
static int running = 1;
|
static int running = 1;
|
||||||
static void msg_process(TAOS_RES* msg) {
|
static void msg_process(TAOS_RES* msg) {
|
||||||
|
|
|
@ -122,59 +122,6 @@ static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
|
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
int32_t sz = 0;
|
|
||||||
// tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
|
|
||||||
if (pRsp->numOfTopics == 0) return tlen;
|
|
||||||
tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema);
|
|
||||||
if (pRsp->pBlockData) {
|
|
||||||
sz = taosArrayGetSize(pRsp->pBlockData);
|
|
||||||
}
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
|
|
||||||
tlen += tEncodeDataBlock(buf, pBlock);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
|
|
||||||
int32_t sz;
|
|
||||||
// buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
|
|
||||||
if (pRsp->numOfTopics == 0) return buf;
|
|
||||||
pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
|
||||||
if (pRsp->schema == NULL) return NULL;
|
|
||||||
buf = taosDecodeSSchemaWrapper(buf, pRsp->schema);
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SSDataBlock block = {0};
|
|
||||||
tDecodeDataBlock(buf, &block);
|
|
||||||
taosArrayPush(pRsp->pBlockData, &block);
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
|
|
||||||
if (pRsp->schema) {
|
|
||||||
if (pRsp->schema->nCols) {
|
|
||||||
taosMemoryFreeClear(pRsp->schema->pSchema);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pRsp->schema);
|
|
||||||
}
|
|
||||||
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner);
|
|
||||||
pRsp->pBlockData = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
// the following structure shared by parser and executor
|
// the following structure shared by parser and executor
|
||||||
typedef struct SColumn {
|
typedef struct SColumn {
|
||||||
|
@ -205,12 +152,12 @@ typedef struct STableBlockDistInfo {
|
||||||
int32_t firstSeekTimeUs;
|
int32_t firstSeekTimeUs;
|
||||||
uint32_t numOfRowsInMemTable;
|
uint32_t numOfRowsInMemTable;
|
||||||
uint32_t numOfSmallBlocks;
|
uint32_t numOfSmallBlocks;
|
||||||
SArray *dataBlockInfos;
|
SArray* dataBlockInfos;
|
||||||
} STableBlockDistInfo;
|
} STableBlockDistInfo;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
FUNC_PARAM_TYPE_VALUE = 0x1,
|
FUNC_PARAM_TYPE_VALUE = 0x1,
|
||||||
FUNC_PARAM_TYPE_COLUMN= 0x2,
|
FUNC_PARAM_TYPE_COLUMN = 0x2,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SFunctParam {
|
typedef struct SFunctParam {
|
||||||
|
|
|
@ -1622,103 +1622,11 @@ typedef struct {
|
||||||
char data[];
|
char data[];
|
||||||
} SVShowTablesFetchRsp;
|
} SVShowTablesFetchRsp;
|
||||||
|
|
||||||
typedef struct SMqCMGetSubEpReq {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
} SMqCMGetSubEpReq;
|
} SMqAskEpReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI32(buf, pMsg->contLen);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pMsg->vgId);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct SMqHbRsp {
|
|
||||||
int8_t status; // idle or not
|
|
||||||
int8_t vnodeChanged;
|
|
||||||
int8_t epChanged; // should use new epset
|
|
||||||
int8_t reserved;
|
|
||||||
SEpSet epSet;
|
|
||||||
} SMqHbRsp;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI8(buf, pRsp->status);
|
|
||||||
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
|
|
||||||
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
|
|
||||||
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
|
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->status);
|
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
|
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
|
|
||||||
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct SMqHbOneTopicBatchRsp {
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
SArray* rsps; // SArray<SMqHbRsp>
|
|
||||||
} SMqHbOneTopicBatchRsp;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeString(buf, pBatchRsp->topicName);
|
|
||||||
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
|
|
||||||
tlen += taosEncodeSMqHbRsp(buf, pRsp);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
||||||
int32_t sz;
|
|
||||||
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqHbRsp rsp;
|
|
||||||
buf = taosDecodeSMqHbRsp(buf, &rsp);
|
|
||||||
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct SMqHbBatchRsp {
|
|
||||||
int64_t consumerId;
|
|
||||||
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
|
|
||||||
} SMqHbBatchRsp;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
|
|
||||||
int32_t sz;
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
|
|
||||||
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
|
|
||||||
int32_t sz;
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqHbOneTopicBatchRsp rsp;
|
|
||||||
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
|
|
||||||
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t key;
|
int32_t key;
|
||||||
|
@ -2442,22 +2350,6 @@ typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
} SMqRspHead;
|
} SMqRspHead;
|
||||||
|
|
||||||
#if 0
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead head;
|
|
||||||
|
|
||||||
int64_t consumerId;
|
|
||||||
int64_t blockingTime;
|
|
||||||
int32_t epoch;
|
|
||||||
int8_t withSchema;
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
|
|
||||||
int64_t currentOffset;
|
|
||||||
uint64_t reqId;
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
} SMqPollReq;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
|
@ -2481,18 +2373,6 @@ typedef struct {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqSubTopicEp;
|
} SMqSubTopicEp;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMqRspHead head;
|
|
||||||
int64_t reqOffset;
|
|
||||||
int64_t rspOffset;
|
|
||||||
int32_t skipLogNum;
|
|
||||||
// TODO: replace with topic name
|
|
||||||
int32_t numOfTopics;
|
|
||||||
// TODO: remove from msg
|
|
||||||
SSchemaWrapper* schema;
|
|
||||||
SArray* pBlockData; // SArray<SSDataBlock>
|
|
||||||
} SMqPollRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
int64_t reqOffset;
|
int64_t reqOffset;
|
||||||
|
@ -2616,7 +2496,7 @@ typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
SArray* topics; // SArray<SMqSubTopicEp>
|
SArray* topics; // SArray<SMqSubTopicEp>
|
||||||
} SMqCMGetSubEpRsp;
|
} SMqAskEpRsp;
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
||||||
// taosMemoryFree(pSubTopicEp->schema.pSchema);
|
// taosMemoryFree(pSubTopicEp->schema.pSchema);
|
||||||
|
@ -2638,10 +2518,6 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
|
|
||||||
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||||
|
@ -2674,7 +2550,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
|
static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
// tlen += taosEncodeString(buf, pRsp->cgroup);
|
// tlen += taosEncodeString(buf, pRsp->cgroup);
|
||||||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||||||
|
@ -2686,7 +2562,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
|
static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
|
||||||
// buf = taosDecodeStringTo(buf, pRsp->cgroup);
|
// buf = taosDecodeStringTo(buf, pRsp->cgroup);
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
@ -2702,6 +2578,10 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
|
||||||
|
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
|
||||||
|
}
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -145,7 +145,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-mq-ask-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mnode-mq-ask-ep", SMqAskEpReq, SMqAskEpReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||||
|
|
|
@ -130,7 +130,7 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_PERFS_TABLE_QUERIES "queries"
|
#define TSDB_PERFS_TABLE_QUERIES "queries"
|
||||||
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
||||||
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
||||||
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
|
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
||||||
|
|
||||||
#define TSDB_INDEX_TYPE_SMA "SMA"
|
#define TSDB_INDEX_TYPE_SMA "SMA"
|
||||||
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
||||||
|
@ -284,8 +284,9 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_IPv4ADDR_LEN 16
|
#define TSDB_IPv4ADDR_LEN 16
|
||||||
#define TSDB_FILENAME_LEN 128
|
#define TSDB_FILENAME_LEN 128
|
||||||
#define TSDB_SHOW_SQL_LEN 512
|
#define TSDB_SHOW_SQL_LEN 512
|
||||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
|
||||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||||
|
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||||
|
#define TSDB_SHOW_LIST_LEN 1000
|
||||||
|
|
||||||
#define TSDB_TRANS_STAGE_LEN 12
|
#define TSDB_TRANS_STAGE_LEN 12
|
||||||
#define TSDB_TRANS_TYPE_LEN 16
|
#define TSDB_TRANS_TYPE_LEN 16
|
||||||
|
|
|
@ -25,7 +25,14 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool sync);
|
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t inited;
|
||||||
|
tmr_h timer;
|
||||||
|
} SMqMgmt;
|
||||||
|
|
||||||
|
static SMqMgmt tmqMgmt = {0};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t tmqRspType;
|
int8_t tmqRspType;
|
||||||
|
@ -35,7 +42,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t tmqRspType;
|
int8_t tmqRspType;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
SMqCMGetSubEpRsp msg;
|
SMqAskEpRsp msg;
|
||||||
} SMqAskEpRspWrapper;
|
} SMqAskEpRspWrapper;
|
||||||
|
|
||||||
struct tmq_list_t {
|
struct tmq_list_t {
|
||||||
|
@ -64,13 +71,6 @@ struct tmq_conf_t {
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t inited;
|
|
||||||
tmr_h timer;
|
|
||||||
} SMqMgmt;
|
|
||||||
|
|
||||||
static SMqMgmt tmqMgmt = {0};
|
|
||||||
|
|
||||||
struct tmq_t {
|
struct tmq_t {
|
||||||
// conf
|
// conf
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
|
@ -164,7 +164,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tmq_t* tmq;
|
tmq_t* tmq;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t sync;
|
int32_t async;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
} SMqAskEpCbParam;
|
} SMqAskEpCbParam;
|
||||||
|
|
||||||
|
@ -188,6 +188,7 @@ typedef struct {
|
||||||
tmq_conf_t* tmq_conf_new() {
|
tmq_conf_t* tmq_conf_new() {
|
||||||
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
||||||
conf->autoCommit = false;
|
conf->autoCommit = false;
|
||||||
|
conf->autoCommitInterval = 5000;
|
||||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
@ -324,7 +325,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
||||||
if (pTaskType == NULL) break;
|
if (pTaskType == NULL) break;
|
||||||
|
|
||||||
if (*pTaskType == TMQ_DELAYED_TASK__HB) {
|
if (*pTaskType == TMQ_DELAYED_TASK__HB) {
|
||||||
tmqAskEp(tmq, false);
|
tmqAskEp(tmq, true);
|
||||||
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
||||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||||
tmq_commit(tmq, NULL, true);
|
tmq_commit(tmq, NULL, true);
|
||||||
|
@ -472,8 +473,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
/*pTmq->autoCommit = conf->autoCommit;*/
|
pTmq->autoCommit = conf->autoCommit;
|
||||||
pTmq->autoCommit = 0;
|
|
||||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||||
pTmq->commit_cb = conf->commit_cb;
|
pTmq->commit_cb = conf->commit_cb;
|
||||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||||
|
@ -662,8 +662,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
if (code != 0) goto FAIL;
|
if (code != 0) goto FAIL;
|
||||||
|
|
||||||
// TODO: add max retry cnt
|
// TODO: add max retry cnt
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, true)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||||
tscDebug("not ready, retry\n");
|
tscDebug("not ready, retry");
|
||||||
taosMsleep(500);
|
taosMsleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -854,7 +854,7 @@ CREATE_MSG_FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
/*printf("call update ep %d\n", epoch);*/
|
/*printf("call update ep %d\n", epoch);*/
|
||||||
bool set = false;
|
bool set = false;
|
||||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||||
|
@ -936,7 +936,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tmq_t* tmq = pParam->tmq;
|
tmq_t* tmq = pParam->tmq;
|
||||||
pParam->code = code;
|
pParam->code = code;
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
|
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -950,15 +950,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pParam->sync) {
|
if (!pParam->async) {
|
||||||
SMqCMGetSubEpRsp rsp;
|
SMqAskEpRsp rsp;
|
||||||
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
||||||
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
||||||
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||||
}
|
}
|
||||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
|
@ -969,7 +969,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
||||||
pWrapper->epoch = head->epoch;
|
pWrapper->epoch = head->epoch;
|
||||||
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||||
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pWrapper);
|
taosWriteQitem(tmq->mqueue, pWrapper);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
|
@ -978,13 +978,13 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
END:
|
END:
|
||||||
/*atomic_store_8(&tmq->epStatus, 0);*/
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
if (pParam->sync) {
|
if (!pParam->async) {
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
#if 0
|
#if 0
|
||||||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||||
|
@ -995,8 +995,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
}
|
}
|
||||||
atomic_store_32(&tmq->epSkipCnt, 0);
|
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||||
#endif
|
#endif
|
||||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
int32_t tlen = sizeof(SMqAskEpReq);
|
||||||
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
SMqAskEpReq* req = taosMemoryMalloc(tlen);
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tscError("failed to malloc get subscribe ep buf");
|
tscError("failed to malloc get subscribe ep buf");
|
||||||
/*atomic_store_8(&tmq->epStatus, 0);*/
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
|
@ -1014,7 +1014,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
pParam->sync = sync;
|
pParam->async = async;
|
||||||
tsem_init(&pParam->rspSem, 0, 0);
|
tsem_init(&pParam->rspSem, 0, 0);
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
||||||
|
@ -1036,7 +1036,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = pParam;
|
sendInfo->param = pParam;
|
||||||
sendInfo->fp = tmqAskEpCb;
|
sendInfo->fp = tmqAskEpCb;
|
||||||
sendInfo->msgType = TDMT_MND_GET_SUB_EP;
|
sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
@ -1045,7 +1045,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
if (sync) {
|
if (!async) {
|
||||||
tsem_wait(&pParam->rspSem);
|
tsem_wait(&pParam->rspSem);
|
||||||
code = pParam->code;
|
code = pParam->code;
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
@ -1209,7 +1209,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
||||||
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
||||||
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||||
SMqCMGetSubEpRsp* rspMsg = &pEpRspWrapper->msg;
|
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
|
||||||
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
|
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
|
||||||
/*tmqClearUnhandleMsg(tmq);*/
|
/*tmqClearUnhandleMsg(tmq);*/
|
||||||
*pReset = true;
|
*pReset = true;
|
||||||
|
@ -1271,15 +1271,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
SMqRspObj* rspObj;
|
SMqRspObj* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
// TODO: put into delayed queue
|
|
||||||
#if 0
|
|
||||||
int8_t status = atomic_load_8(&tmq->status);
|
|
||||||
while (0 != tmqAskEp(tmq, status != TMQ_CONSUMER_STATUS__READY)) {
|
|
||||||
tscDebug("not ready, retry\n");
|
|
||||||
taosSsleep(1);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
|
|
|
@ -211,7 +211,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -441,9 +441,7 @@ typedef struct {
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
// TODO: use subDbUid
|
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int64_t subDbUid;
|
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int8_t subType; // db or table
|
int8_t subType; // db or table
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
|
|
@ -59,7 +59,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessAskEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
@ -86,7 +86,7 @@ static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
// TODO delete consumer
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -198,8 +198,8 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
|
|
||||||
static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pNode;
|
SMnode *pMnode = pMsg->pNode;
|
||||||
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
|
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->rpcMsg.pCont;
|
||||||
SMqCMGetSubEpRsp rsp = {0};
|
SMqAskEpRsp rsp = {0};
|
||||||
int64_t consumerId = be64toh(pReq->consumerId);
|
int64_t consumerId = be64toh(pReq->consumerId);
|
||||||
int32_t epoch = ntohl(pReq->epoch);
|
int32_t epoch = ntohl(pReq->epoch);
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
}
|
}
|
||||||
// encode rsp
|
// encode rsp
|
||||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
|
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
|
||||||
void *buf = rpcMallocCont(tlen);
|
void *buf = rpcMallocCont(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -311,10 +311,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
||||||
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
|
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
|
||||||
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
|
tEncodeSMqAskEpRsp(&abuf, &rsp);
|
||||||
|
|
||||||
// release consumer and free memory
|
// release consumer and free memory
|
||||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
// send rsp
|
// send rsp
|
||||||
|
@ -322,7 +322,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
||||||
pMsg->rspLen = tlen;
|
pMsg->rspLen = tlen;
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,7 +215,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
|
||||||
if (pSubNew == NULL) return NULL;
|
if (pSubNew == NULL) return NULL;
|
||||||
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
taosInitRWLatch(&pSubNew->lock);
|
taosInitRWLatch(&pSubNew->lock);
|
||||||
pSubNew->vgNum = -1;
|
pSubNew->vgNum = 0;
|
||||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
// TODO set free fp
|
// TODO set free fp
|
||||||
SMqConsumerEpInSub epInSub = {
|
SMqConsumerEpInSub epInSub = {
|
||||||
|
|
|
@ -41,29 +41,31 @@ static const SPerfsTableSchema queriesSchema[] = {
|
||||||
|
|
||||||
static const SPerfsTableSchema topicSchema[] = {
|
static const SPerfsTableSchema topicSchema[] = {
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
/*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
/*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
|
// TODO config
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SPerfsTableSchema consumerSchema[] = {
|
static const SPerfsTableSchema consumerSchema[] = {
|
||||||
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
// ep
|
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
// up time
|
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
// topics
|
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SPerfsTableSchema subscribeSchema[] = {
|
static const SPerfsTableSchema subscriptionSchema[] = {
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "committed_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "current_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SPerfsTableMeta perfsMeta[] = {
|
static const SPerfsTableMeta perfsMeta[] = {
|
||||||
|
@ -71,7 +73,7 @@ static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
|
||||||
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
||||||
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
||||||
{TSDB_PERFS_TABLE_SUBSCRIBES, subscribeSchema, tListLen(subscribeSchema)},
|
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
||||||
};
|
};
|
||||||
|
|
||||||
// connection/application/
|
// connection/application/
|
||||||
|
|
|
@ -478,6 +478,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SQueryPlan* pPlan = NULL;
|
SQueryPlan* pPlan = NULL;
|
||||||
SSubplan* plan = NULL;
|
SSubplan* plan = NULL;
|
||||||
|
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
|
@ -485,10 +486,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->vgNum == -1);
|
|
||||||
|
|
||||||
pSub->vgNum = 0;
|
|
||||||
|
|
||||||
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
|
||||||
if (levelNum != 1) {
|
if (levelNum != 1) {
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
@ -529,7 +526,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
pVgEp->vgId = pVgroup->vgId;
|
pVgEp->vgId = pVgroup->vgId;
|
||||||
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
||||||
|
|
||||||
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||||
|
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
|
|
@ -76,7 +76,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->subDbUid, TOPIC_ENCODE_OVER);
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
|
||||||
|
@ -139,7 +138,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->subDbUid, TOPIC_DECODE_OVER);
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
|
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
|
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);
|
||||||
|
@ -520,29 +518,33 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
|
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
|
SColumnInfoData *pColInfo;
|
||||||
|
SName n;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
|
||||||
SName n;
|
|
||||||
tNameFromString(&n, pTopic->name, T_NAME_ACCT|T_NAME_DB);
|
|
||||||
tNameGetDbName(&n, varDataVal(topicName));
|
tNameGetDbName(&n, varDataVal(topicName));
|
||||||
varDataSetLen(topicName, strlen(varDataVal(topicName)));
|
varDataSetLen(topicName, strlen(varDataVal(topicName)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
||||||
|
|
||||||
|
char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
tNameGetDbName(&n, varDataVal(dbName));
|
||||||
|
varDataSetLen(dbName, strlen(varDataVal(dbName)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)dbName, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
|
tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
|
||||||
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||||
|
|
||||||
// taosMemoryFree(sql);
|
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
|
@ -277,8 +277,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exist
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
|
||||||
|
|
||||||
// mnode-topic
|
// mnode-mq
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
|
||||||
|
|
||||||
// mnode-sma
|
// mnode-sma
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||||
|
|
Loading…
Reference in New Issue