feat(tmq): show subcription
This commit is contained in:
parent
661fbd2e46
commit
52e1fcbff7
|
@ -46,7 +46,6 @@ extern "C" {
|
|||
|
||||
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
|
||||
#define TSDB_PERFS_TABLE_SMAS "smas"
|
||||
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
|
||||
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
|
||||
#define TSDB_PERFS_TABLE_QUERIES "queries"
|
||||
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
||||
|
|
|
@ -99,7 +99,7 @@ typedef enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_VGROUP,
|
||||
TSDB_MGMT_TABLE_TOPICS,
|
||||
TSDB_MGMT_TABLE_CONSUMERS,
|
||||
TSDB_MGMT_TABLE_SUBSCRIBES,
|
||||
TSDB_MGMT_TABLE_SUBSCRIPTIONS,
|
||||
TSDB_MGMT_TABLE_TRANS,
|
||||
TSDB_MGMT_TABLE_SMAS,
|
||||
TSDB_MGMT_TABLE_CONFIGS,
|
||||
|
|
|
@ -273,8 +273,8 @@ static const SSysDbTableSchema consumerSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema subscriptionSchema[] = {
|
||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
};
|
||||
|
|
|
@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
|
|||
type = TSDB_MGMT_TABLE_VGROUP;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_CONSUMERS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_SUBSCRIBES;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_TRANS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
|
||||
|
|
|
@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
|
|||
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
|
@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SMqSubscribeObj *pSub = NULL;
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
if (numOfRows + pSub->vgNum > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// topic and cgroup
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||
|
||||
// vg id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
||||
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
||||
|
||||
// offset
|
||||
#if 0
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
#endif
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// topic and cgroup
|
||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||
|
||||
// vg id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
||||
|
||||
// consumer id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
|
||||
// offset
|
||||
#if 0
|
||||
// subscribe time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
||||
|
||||
// rebalance time
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||
#endif
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
sdbRelease(pSdb, pSub);
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
|
|||
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
|
||||
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
|
||||
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
|
||||
|
||||
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
||||
|
||||
|
|
Loading…
Reference in New Issue