refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-03 01:52:03 +08:00
parent 5564eb215c
commit a543f4ca97
4 changed files with 17 additions and 16 deletions

View File

@ -601,9 +601,9 @@ typedef struct STaskStatusEntry {
int64_t verEnd; // end version in WAL, only valid for source task
int64_t offset; // only valid for source task
double inputQUsed; // in MiB
double inputQCap;
double inputRate;
double outputQUsed; // in MiB
double outputQCap;
double outputRate;
} STaskStatusEntry;
typedef struct SStreamHbMsg {

View File

@ -162,10 +162,10 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "in_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "in_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_queue", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userTblsSchema[] = {

View File

@ -1589,15 +1589,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char* queueInfoStr = "%.2fMiB (%.2f%, %.2fMiB)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputQUsed/pe->inputQCap, pe->inputQCap);
const char* queueInfoStr = "%.2fMiB (%.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// output queue
sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputQUsed/pe->outputQCap, pe->outputQCap);
sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2450,9 +2450,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} else {
pEntry->stage = p->stage;
pEntry->inputQUsed = p->inputQUsed;
pEntry->inputQCap = p->inputQCap;
pEntry->inputRate = p->inputRate;
pEntry->outputQUsed = p->outputQUsed;
pEntry->outputQCap = p->outputQCap;
pEntry->outputRate = p->outputRate;
pEntry->offset = p->offset;
}

View File

@ -774,9 +774,9 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQCap) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputQCap) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputRate) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -798,9 +798,9 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQCap) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputQCap) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputRate) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
@ -880,11 +880,12 @@ void metaHbToMnode(void* param, void* tmrId) {
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
.inputQCap = STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE,
.outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)),
.outputQCap = STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE,
};
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE;
entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) {