diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b399459230..a9da7d5ef8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -601,9 +601,9 @@ typedef struct STaskStatusEntry { int64_t verEnd; // end version in WAL, only valid for source task int64_t offset; // only valid for source task double inputQUsed; // in MiB - double inputQCap; + double inputRate; double outputQUsed; // in MiB - double outputQCap; + double outputRate; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e4e4f2ce99..cea1c559cf 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -162,10 +162,10 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "in_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "out_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "in_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "out_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema userTblsSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05b3c3510c..ee85ce24ee 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1589,15 +1589,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // input queue char vbuf[30] = {0}; char buf[25] = {0}; - const char* queueInfoStr = "%.2fMiB (%.2f%, %.2fMiB)"; - sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputQUsed/pe->inputQCap, pe->inputQCap); + const char* queueInfoStr = "%.2fMiB (%.2f%)"; + sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); // output queue - sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputQUsed/pe->outputQCap, pe->outputQCap); + sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2450,9 +2450,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } else { pEntry->stage = p->stage; pEntry->inputQUsed = p->inputQUsed; - pEntry->inputQCap = p->inputQCap; + pEntry->inputRate = p->inputRate; pEntry->outputQUsed = p->outputQUsed; - pEntry->outputQCap = p->outputQCap; + pEntry->outputRate = p->outputRate; pEntry->offset = p->offset; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index be98c6bbcd..122d52d40e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -774,9 +774,9 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->inputQCap) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->outputQCap) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -798,9 +798,9 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.inputQCap) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.outputQCap) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -880,11 +880,12 @@ void metaHbToMnode(void* param, void* tmrId) { .nodeId = pMeta->vgId, .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), - .inputQCap = STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), - .outputQCap = STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE, }; + entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE; + entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; + taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasValEpset) {