Merge pull request #18134 from taosdata/feature/stream
enh(stream): add stream task info into information schema
This commit is contained in:
commit
9e18a86ad2
|
@ -46,6 +46,7 @@ extern "C" {
|
|||
#define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions"
|
||||
#define TSDB_INS_TABLE_TOPICS "ins_topics"
|
||||
#define TSDB_INS_TABLE_STREAMS "ins_streams"
|
||||
#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks"
|
||||
|
||||
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
|
||||
#define TSDB_PERFS_TABLE_SMAS "perf_smas"
|
||||
|
|
|
@ -119,6 +119,7 @@ typedef enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_QUERIES,
|
||||
TSDB_MGMT_TABLE_VNODES,
|
||||
TSDB_MGMT_TABLE_APPS,
|
||||
TSDB_MGMT_TABLE_STREAM_TASKS,
|
||||
TSDB_MGMT_TABLE_MAX,
|
||||
} EShowType;
|
||||
|
||||
|
|
|
@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema streamSchema[] = {
|
||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
|
@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = {
|
|||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema streamTaskSchema[] = {
|
||||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema userTblsSchema[] = {
|
||||
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||
|
@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = {
|
|||
{TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false},
|
||||
{TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false},
|
||||
{TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false},
|
||||
{TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false},
|
||||
{TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true},
|
||||
};
|
||||
|
||||
|
|
|
@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
|
|||
type = TSDB_MGMT_TABLE_STREAMS;
|
||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_APPS;
|
||||
} else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
|
||||
type = TSDB_MGMT_TABLE_STREAM_TASKS;
|
||||
} else {
|
||||
// ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
|
|||
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
SName n;
|
||||
int32_t cols = 0;
|
||||
|
||||
char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
|
||||
|
@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
// lock
|
||||
taosRLockLatch(&pStream->lock);
|
||||
// count task num
|
||||
int32_t sz = taosArrayGetSize(pStream->tasks);
|
||||
int32_t count = 0;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
count += taosArrayGetSize(pLevel);
|
||||
}
|
||||
|
||||
if (numOfRows + count > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + count);
|
||||
}
|
||||
// add row for each task
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t levelCnt = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < levelCnt; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// stream name
|
||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
|
||||
|
||||
// task id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false);
|
||||
|
||||
// node type
|
||||
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
varDataSetLen(nodeType, 5);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (pTask->nodeId > 0) {
|
||||
memcpy(varDataVal(nodeType), "vnode", 5);
|
||||
} else {
|
||||
memcpy(varDataVal(nodeType), "snode", 5);
|
||||
}
|
||||
colDataAppend(pColInfo, numOfRows, nodeType, false);
|
||||
|
||||
// node id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
int32_t nodeId = TMAX(pTask->nodeId, 0);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false);
|
||||
|
||||
// level
|
||||
char level[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
memcpy(varDataVal(level), "source", 6);
|
||||
varDataSetLen(level, 6);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
memcpy(varDataVal(level), "agg", 3);
|
||||
varDataSetLen(level, 3);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
memcpy(varDataVal(level), "sink", 4);
|
||||
varDataSetLen(level, 4);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
}
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&level, false);
|
||||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
char status2[20] = {0};
|
||||
strcpy(status, "normal");
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosRUnLockLatch(&pStream->lock);
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
|
|
@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
}
|
||||
ASSERT(pFileInfo->fileSize == 0);
|
||||
// remove the empty wal log, and its idx
|
||||
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
// remove its meta entry
|
||||
taosArrayRemove(pWal->fileInfoSet, fileIdx);
|
||||
|
|
|
@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) {
|
|||
}
|
||||
walBuildLogName(pWal, newFileFirstVer, fnameStr);
|
||||
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
|
||||
if (pLogFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
code = -1;
|
||||
|
|
Loading…
Reference in New Issue