fix(stream): add more info

This commit is contained in:
Haojun Liao 2024-05-22 11:04:54 +08:00
parent f6af165d9a
commit be39044b66
7 changed files with 136 additions and 23 deletions

View File

@ -357,8 +357,12 @@ typedef struct STaskExecStatisInfo {
double step2El; double step2El;
int32_t updateCount; int32_t updateCount;
int64_t latestUpdateTs; int64_t latestUpdateTs;
int32_t processDataBlocks; int32_t inputDataBlocks;
int64_t processDataSize; int64_t inputDataSize;
double procsThroughput;
int64_t outputDataBlocks;
int64_t outputDataSize;
double outputThroughput;
int32_t dispatch; int32_t dispatch;
int64_t dispatchDataSize; int64_t dispatchDataSize;
int32_t checkpoint; int32_t checkpoint;
@ -566,6 +570,8 @@ typedef struct STaskCkptInfo {
int64_t latestId; // saved checkpoint id int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver int64_t latestVer; // saved checkpoint ver
int64_t latestTime; // latest checkpoint time int64_t latestTime; // latest checkpoint time
int64_t latestSize; // latest checkpoint size
int8_t remoteBackup; // latest checkpoint backup done
int64_t activeId; // current active checkpoint id int64_t activeId; // current active checkpoint id
int32_t activeTransId; // checkpoint trans id int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not int8_t failed; // denote if the checkpoint is failed or not
@ -583,6 +589,10 @@ typedef struct STaskStatusEntry {
int64_t inputQUnchangeCounter; int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double procsThroughput; // duration between one element put into input queue and being processed.
double procsTotal; // duration between one element put into input queue and being processed.
double outputThroughput; // the size of dispatched result blocks in bytes
double outputTotal; // the size of dispatched result blocks in bytes
double sinkQuota; // existed quota size for sink task double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size double sinkDataSize; // sink to dst data size
int64_t startTime; int64_t startTime;

View File

@ -187,8 +187,12 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "input_idle", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "process_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "dispatch_data", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "process_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_throughput", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_total", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
@ -196,10 +200,10 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "checkpoint_size", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_size", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "extra_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };

View File

@ -1620,22 +1620,75 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue // input queue
char vbuf[30] = {0}; char vbuf[40] = {0};
char buf[25] = {0}; char buf[40] = {0};
const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; const char *queueInfoStr = "%4.2f MiB (%6.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf); STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// input idle // input total
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); const char* formatTotalMb = "%7.2f MiB";
colDataSetNULL(pColInfo, numOfRows); const char* formatTotalGb = "%7.2f GiB";
if (pe->procsTotal < 1024) {
sprintf(buf, formatTotalMb, pe->procsTotal);
} else {
sprintf(buf, formatTotalGb, pe->procsTotal / 1024);
}
memset(vbuf, 0, tListLen(vbuf));
STR_TO_VARSTR(vbuf, buf);
// dispatch data
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// process throughput
const char* formatKb = "%7.2f KiB/s";
const char* formatMb = "%7.2f MiB/s";
if (pe->procsThroughput < 1024) {
sprintf(buf, formatKb, pe->procsThroughput);
} else {
sprintf(buf, formatMb, pe->procsThroughput / 1024);
}
memset(vbuf, 0, tListLen(vbuf));
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output total
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
} else {
sprintf(buf, formatTotalMb, pe->outputTotal);
memset(vbuf, 0, tListLen(vbuf));
STR_TO_VARSTR(vbuf, buf);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
}
// output throughput
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
colDataSetNULL(pColInfo, numOfRows);
} else {
if (pe->outputThroughput < 1024) {
sprintf(buf, formatKb, pe->outputThroughput);
} else {
sprintf(buf, formatMb, pe->outputThroughput / 1024);
}
memset(vbuf, 0, tListLen(vbuf));
STR_TO_VARSTR(vbuf, buf);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
}
// output queue // output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
@ -1651,7 +1704,9 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info // offset info
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
} else {
memset(buf, 0, tListLen(buf));
} }
STR_TO_VARSTR(vbuf, buf); STR_TO_VARSTR(vbuf, buf);

View File

@ -586,6 +586,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
} }
pTask->execInfo.inputDataBlocks += numOfBlocks;
pTask->execInfo.inputDataSize += blockSize;
// dispatch checkpoint msg to all downstream tasks // dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type; int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
@ -601,11 +604,21 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
int64_t st = taosGetTimestampMs();
// here only handle the data block sink operation // here only handle the data block sink operation
if (type == STREAM_INPUT__DATA_BLOCK) { if (type == STREAM_INPUT__DATA_BLOCK) {
pTask->execInfo.sink.dataSize += blockSize; pTask->execInfo.sink.dataSize += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
double el = (taosGetTimestampMs() - st) / 1000.0;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
} else {
pTask->execInfo.procsThroughput = (blockSize / el);
}
continue; continue;
} }
} }
@ -639,13 +652,23 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
int64_t ver = pTask->chkInfo.processedVer; int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id); doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t resSize = 0; int64_t totalSize = 0;
int32_t totalBlocks = 0; int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(resSize), totalBlocks); SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
SCheckpointInfo* pInfo = &pTask->chkInfo; SCheckpointInfo* pInfo = &pTask->chkInfo;

View File

@ -1017,8 +1017,13 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.hTaskId = (*pTask)->hTaskInfo.id.taskId, .hTaskId = (*pTask)->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize),
.outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize),
.procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput),
.outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput),
.startCheckpointId = (*pTask)->execInfo.startCheckpointId, .startCheckpointId = (*pTask)->execInfo.startCheckpointId,
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
}; };

View File

@ -771,6 +771,10 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->stage = pSrc->stage; pDst->stage = pSrc->stage;
pDst->inputQUsed = pSrc->inputQUsed; pDst->inputQUsed = pSrc->inputQUsed;
pDst->inputRate = pSrc->inputRate; pDst->inputRate = pSrc->inputRate;
pDst->procsTotal = pSrc->procsTotal;
pDst->procsThroughput = pSrc->procsThroughput;
pDst->outputTotal = pSrc->outputTotal;
pDst->outputThroughput = pSrc->outputThroughput;
pDst->processedVer = pSrc->processedVer; pDst->processedVer = pSrc->processedVer;
pDst->verRange = pSrc->verRange; pDst->verRange = pSrc->verRange;
pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkQuota = pSrc->sinkQuota;

View File

@ -335,6 +335,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->procsTotal) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->procsThroughput) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputTotal) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->outputThroughput) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
@ -346,6 +350,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
@ -381,6 +387,10 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.procsTotal) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.procsThroughput) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputTotal) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.outputThroughput) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
@ -393,6 +403,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1;
if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;