diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 9de682dd3a..d5dc1581a4 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -187,6 +187,8 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "input_idle", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "dispatch_data", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, @@ -195,6 +197,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_size", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fb15e4b857..03385ea226 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1629,6 +1629,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + // input idle + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); + + // dispatch data + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); + // output queue // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // STR_TO_VARSTR(vbuf, buf); @@ -1679,6 +1687,10 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + // checkpoint size + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); + // checkpoint backup status pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, 0, true); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c0f58fc3ec..ce6db55cef 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -744,7 +744,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid); } } else { - // The auto-create option will always set to be open for those submit messages, which arrive during the period + // The auto-create option will always set to be open for those submit messages, which arrives during the period // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have @@ -752,7 +752,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); - // table not in cache, let's try the extract it from tsdb meta + // table not in cache, let's try to extract it from tsdb meta if (metaGetTableEntryByName(&mr, dstTableName) < 0) { metaReaderClear(&mr);