enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks.

This commit is contained in:
Haojun Liao 2023-11-20 11:59:49 +08:00
parent 8204db13aa
commit 6fca8c170c
2 changed files with 16 additions and 0 deletions

View File

@ -156,6 +156,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema streamTaskSchema[] = {

View File

@ -1543,6 +1543,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
STR_TO_VARSTR(dstStr, sinkQuota)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a");
memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
numOfRows++;
sdbRelease(pSdb, pStream);
}