From 3912e49309ea99f253f2901886e97d981833e5f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:46:11 +0800 Subject: [PATCH 1/3] enh(stream): add sink_quota column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5f44d3e7fc..43ba3d2698 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -156,6 +156,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 766e01d955..b3a2a9fa6e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; - SName n; int32_t cols = 0; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; From 7cfa45e95e5c73ec1363f530fa600156a71b67d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:59:49 +0800 Subject: [PATCH 2/3] enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 15 +++++++++++++++ source/libs/stream/src/streamCheckpoint.c | 2 +- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 43ba3d2698..a1f8d74571 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -157,6 +157,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b3a2a9fa6e..ee990956ff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1533,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + numOfRows++; sdbRelease(pSdb, pStream); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5540e3b6fd..6201329b95 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; From 7518103b96a114df3455cfc398e1ee2ddc1aeb35 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 13:06:12 +0800 Subject: [PATCH 3/3] fix(test): adjust test case accordingly. --- tests/system-test/0-others/information_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index ac952d383a..2bfe33d0af 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -217,7 +217,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(208, len(tdSql.queryResult)) + tdSql.checkEqual(210, len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult))