From 7c8b2867cee0d21259419f564243fd507bdebc2b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Nov 2022 19:23:42 +0800 Subject: [PATCH] enh(stream): add stream task info into information schema --- include/common/systable.h | 1 + include/common/tmsg.h | 1 + source/common/src/systable.c | 12 ++- source/dnode/mnode/impl/src/mndShow.c | 2 + source/dnode/mnode/impl/src/mndStream.c | 108 +++++++++++++++++++++++- source/libs/wal/src/walMeta.c | 6 +- source/libs/wal/src/walWrite.c | 1 + 7 files changed, 127 insertions(+), 4 deletions(-) diff --git a/include/common/systable.h b/include/common/systable.h index 8b29525db3..57f85f16bc 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -46,6 +46,7 @@ extern "C" { #define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions" #define TSDB_INS_TABLE_TOPICS "ins_topics" #define TSDB_INS_TABLE_STREAMS "ins_streams" +#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 99c5c72e2f..3281bca96a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -119,6 +119,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_QUERIES, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_APPS, + TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_MAX, } EShowType; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index d3d006ab35..c3a1f9f67e 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = { }; static const SSysDbTableSchema streamSchema[] = { - {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; +static const SSysDbTableSchema streamTaskSchema[] = { + {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + static const SSysDbTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false}, {TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false}, {TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false}, + {TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false}, {TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true}, }; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index b0af98b933..20c2ebb0a4 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_STREAMS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) { type = TSDB_MGMT_TABLE_APPS; + } else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAM_TASKS; } else { // ASSERT(0); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 594c13f957..c649180285 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); return sdbSetTable(pMnode->pSdb, table); } @@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB SName n; int32_t cols = 0; - char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); @@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SStreamObj *pStream = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + if (pShow->pIter == NULL) break; + + // lock + taosRLockLatch(&pStream->lock); + // count task num + int32_t sz = taosArrayGetSize(pStream->tasks); + int32_t count = 0; + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + count += taosArrayGetSize(pLevel); + } + + if (numOfRows + count > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + count); + } + // add row for each task + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t levelCnt = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < levelCnt; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pTask->nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + colDataAppend(pColInfo, numOfRows, nodeType, false); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int32_t nodeId = TMAX(pTask->nodeId, 0); + colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&level, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status2[20] = {0}; + strcpy(status, "normal"); + STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&status, false); + + numOfRows++; + } + } + + // unlock + taosRUnLockLatch(&pStream->lock); + + sdbRelease(pSdb, pStream); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 1c6e1a2e17..6cac4b6093 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { goto _err; } - char* candidate = NULL; - char* haystack = buf; + char* candidate = NULL; + char* haystack = buf; int64_t pos = 0; SWalCkHead* logContent = NULL; @@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) { } ASSERT(pFileInfo->fileSize == 0); // remove the empty wal log, and its idx + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); // remove its meta entry taosArrayRemove(pWal->fileInfoSet, fileIdx); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7ced5fae39..216dd5fcb1 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) { } walBuildLogName(pWal, newFileFirstVer, fnameStr); pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); code = -1;