diff --git a/source/common/src/systable.c b/source/common/src/systable.c index cea1c559cf..19e8945cdf 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -164,8 +164,9 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "in_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "out_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "offset", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema userTblsSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ee85ce24ee..866c1dfca4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1589,7 +1589,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // input queue char vbuf[30] = {0}; char buf[25] = {0}; - const char* queueInfoStr = "%.2fMiB (%.2f%)"; + const char* queueInfoStr = "%5.2fMiB(%5.2f%)"; sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); STR_TO_VARSTR(vbuf, buf); @@ -1597,7 +1597,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); // output queue - sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); +// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); +// STR_TO_VARSTR(vbuf, buf); + +// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); +// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + + // offset version info + const char* offsetStr = "%"PRId64"[%"PRId64",%"PRId64"]"; + sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd); STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2454,6 +2462,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pEntry->outputQUsed = p->outputQUsed; pEntry->outputRate = p->outputRate; pEntry->offset = p->offset; + pEntry->verStart = p->verStart; + pEntry->verEnd = p->verEnd; } pEntry->status = p->status; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 122d52d40e..ea451c64e9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -20,6 +20,7 @@ #include "tref.h" #include "tstream.h" #include "ttimer.h" +#include "wal.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; @@ -777,6 +778,9 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1; + if (tEncodeI64(pEncoder, ps->offset) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -801,6 +805,9 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -885,6 +892,8 @@ void metaHbToMnode(void* param, void* tmrId) { entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE; entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; + entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader); + walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); taosArrayPush(hbMsg.pTaskStatus, &entry);