refactor(stream): add more info in info-meta
This commit is contained in:
parent
3baa4169e6
commit
f6af165d9a
|
@ -187,6 +187,8 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "input_idle", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "dispatch_data", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
|
@ -195,6 +197,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_size", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
|
|
@ -1629,6 +1629,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||||
|
|
||||||
|
// input idle
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
||||||
|
// dispatch data
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
||||||
// output queue
|
// output queue
|
||||||
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
||||||
// STR_TO_VARSTR(vbuf, buf);
|
// STR_TO_VARSTR(vbuf, buf);
|
||||||
|
@ -1679,6 +1687,10 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false);
|
||||||
|
|
||||||
|
// checkpoint size
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
||||||
// checkpoint backup status
|
// checkpoint backup status
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, 0, true);
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
|
|
@ -744,7 +744,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
|
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The auto-create option will always set to be open for those submit messages, which arrive during the period
|
// The auto-create option will always set to be open for those submit messages, which arrives during the period
|
||||||
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
|
// the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning
|
||||||
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
|
// data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for
|
||||||
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
|
// those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have
|
||||||
|
@ -752,7 +752,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
||||||
|
|
||||||
// table not in cache, let's try the extract it from tsdb meta
|
// table not in cache, let's try to extract it from tsdb meta
|
||||||
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
|
if (metaGetTableEntryByName(&mr, dstTableName) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue