Merge pull request #23750 from taosdata/fix/3_liaohj
enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks.
This commit is contained in:
commit
5d7d2a0156
|
@ -156,6 +156,8 @@ static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema streamTaskSchema[] = {
|
static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
|
|
|
@ -1481,7 +1481,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
SColumnInfoData *pColInfo;
|
||||||
SName n;
|
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
@ -1534,6 +1533,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
||||||
|
|
||||||
|
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
sinkQuota[0] = '0';
|
||||||
|
char dstStr[20] = {0};
|
||||||
|
STR_TO_VARSTR(dstStr, sinkQuota)
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
||||||
|
|
||||||
|
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
strcpy(scanHistoryIdle, "100a");
|
||||||
|
|
||||||
|
memset(dstStr, 0, tListLen(dstStr));
|
||||||
|
STR_TO_VARSTR(dstStr, scanHistoryIdle)
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
|
ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId &&
|
||||||
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
||||||
|
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
|
|
|
@ -217,7 +217,7 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
tdSql.checkEqual(20470,len(tdSql.queryResult))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||||
tdSql.checkEqual(208, len(tdSql.queryResult))
|
tdSql.checkEqual(210, len(tdSql.queryResult))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||||
|
|
Loading…
Reference in New Issue