From 6fca8c170c20de8aedbe62398b28409ce9e3c975 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:59:49 +0800 Subject: [PATCH] enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5c57f13091..5d6486ec1a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -156,6 +156,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6c0b4577ed..6755204622 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1543,6 +1543,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + numOfRows++; sdbRelease(pSdb, pStream); }