add show topic
This commit is contained in:
parent
9a3365e1f1
commit
3035ecd58e
|
@ -103,6 +103,7 @@ int32_t create_topic() {
|
||||||
|
|
||||||
/*const char* sql = "select * from tu1";*/
|
/*const char* sql = "select * from tu1";*/
|
||||||
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
|
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
|
||||||
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
|
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
|
|
|
@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = {
|
||||||
|
|
||||||
static const SPerfsTableSchema topicSchema[] = {
|
static const SPerfsTableSchema topicSchema[] = {
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
/*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
/*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SPerfsTableSchema consumerSchema[] = {
|
static const SPerfsTableSchema consumerSchema[] = {
|
||||||
|
|
|
@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
|
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);
|
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) {
|
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark,
|
||||||
|
STrans *pTrans) {
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
|
|
||||||
if (nodesStringToNode(ast, &pAst) < 0) {
|
if (nodesStringToNode(ast, &pAst) < 0) {
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
|
||||||
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
|
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
|
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitTopic(SMnode *pMnode) {
|
int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
|
@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
||||||
|
|
||||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
int32_t cols = 0;
|
|
||||||
char *pWrite;
|
|
||||||
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
while (numOfRows < rowsCapacity) {
|
||||||
if (pDb == NULL) return 0;
|
|
||||||
|
|
||||||
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
|
||||||
strcat(prefix, TS_PATH_DELIMITER);
|
|
||||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
|
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
if (pTopic->dbUid != pDb->uid) {
|
int32_t cols = 0;
|
||||||
if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
|
|
||||||
mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pTopic);
|
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
continue;
|
tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN);
|
||||||
}
|
varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE]));
|
||||||
|
|
||||||
cols = 0;
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
|
||||||
|
|
||||||
char topicName[TSDB_TOPIC_NAME_LEN] = {0};
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
STR_TO_VARSTR(pWrite, topicName);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
*(int64_t *)pWrite = pTopic->createTime;
|
char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE);
|
||||||
cols++;
|
strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql);
|
||||||
|
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
taosMemoryFree(sql);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
pShow->numOfRows += numOfRows;
|
pShow->numOfRows += numOfRows;
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue