refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-03 02:37:52 +08:00
parent a543f4ca97
commit 58f410a9df
3 changed files with 24 additions and 4 deletions

View File

@ -164,8 +164,9 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "in_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "offset", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userTblsSchema[] = {

View File

@ -1589,7 +1589,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char* queueInfoStr = "%.2fMiB (%.2f%)";
const char* queueInfoStr = "%5.2fMiB(%5.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
@ -1597,7 +1597,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// output queue
sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// offset version info
const char* offsetStr = "%"PRId64"[%"PRId64",%"PRId64"]";
sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2454,6 +2462,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pEntry->outputQUsed = p->outputQUsed;
pEntry->outputRate = p->outputRate;
pEntry->offset = p->offset;
pEntry->verStart = p->verStart;
pEntry->verEnd = p->verEnd;
}
pEntry->status = p->status;

View File

@ -20,6 +20,7 @@
#include "tref.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
@ -777,6 +778,9 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1;
if (tEncodeI64(pEncoder, ps->offset) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -801,6 +805,9 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
@ -885,6 +892,8 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE;
entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader);
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
taosArrayPush(hbMsg.pTaskStatus, &entry);