diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0840694964..ea0f1824b3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -357,8 +357,12 @@ typedef struct STaskExecStatisInfo { double step2El; int32_t updateCount; int64_t latestUpdateTs; - int32_t processDataBlocks; - int64_t processDataSize; + int32_t inputDataBlocks; + int64_t inputDataSize; + double procsThroughput; + int64_t outputDataBlocks; + int64_t outputDataSize; + double outputThroughput; int32_t dispatch; int64_t dispatchDataSize; int32_t checkpoint; @@ -566,6 +570,8 @@ typedef struct STaskCkptInfo { int64_t latestId; // saved checkpoint id int64_t latestVer; // saved checkpoint ver int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done int64_t activeId; // current active checkpoint id int32_t activeTransId; // checkpoint trans id int8_t failed; // denote if the checkpoint is failed or not @@ -583,8 +589,12 @@ typedef struct STaskStatusEntry { int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + double procsThroughput; // duration between one element put into input queue and being processed. + double procsTotal; // duration between one element put into input queue and being processed. + double outputThroughput; // the size of dispatched result blocks in bytes + double outputTotal; // the size of dispatched result blocks in bytes + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size int64_t startTime; int64_t startCheckpointId; int64_t startCheckpointVer; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index d5dc1581a4..297f43399f 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -187,8 +187,12 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "input_idle", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "dispatch_data", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "process_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "process_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "out_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "out_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +// {.name = "dispatch_throughput", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +// {.name = "dispatch_total", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, @@ -196,10 +200,10 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_size", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "extra_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 03385ea226..105565067c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1620,22 +1620,75 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); // input queue - char vbuf[30] = {0}; - char buf[25] = {0}; - const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; + char vbuf[40] = {0}; + char buf[40] = {0}; + const char *queueInfoStr = "%4.2f MiB (%6.2f%)"; sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); STR_TO_VARSTR(vbuf, buf); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - // input idle - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetNULL(pColInfo, numOfRows); + // input total + const char* formatTotalMb = "%7.2f MiB"; + const char* formatTotalGb = "%7.2f GiB"; + if (pe->procsTotal < 1024) { + sprintf(buf, formatTotalMb, pe->procsTotal); + } else { + sprintf(buf, formatTotalGb, pe->procsTotal / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); - // dispatch data pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetNULL(pColInfo, numOfRows); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + + // process throughput + const char* formatKb = "%7.2f KiB/s"; + const char* formatMb = "%7.2f MiB/s"; + if (pe->procsThroughput < 1024) { + sprintf(buf, formatKb, pe->procsThroughput); + } else { + sprintf(buf, formatMb, pe->procsThroughput / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + + // output total + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + colDataSetNULL(pColInfo, numOfRows); + } else { + sprintf(buf, formatTotalMb, pe->outputTotal); + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + } + + // output throughput + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + colDataSetNULL(pColInfo, numOfRows); + } else { + if (pe->outputThroughput < 1024) { + sprintf(buf, formatKb, pe->outputThroughput); + } else { + sprintf(buf, formatMb, pe->outputThroughput / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + } // output queue // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); @@ -1646,12 +1699,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char *sinkStr = "%.2fMiB"; + const char *sinkStr = "%.2f MiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + } else { + memset(buf, 0, tListLen(buf)); } STR_TO_VARSTR(vbuf, buf); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9747ebd2ff..9daca9a99c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -96,7 +96,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i *totalSize = 0; int32_t size = 0; - int32_t numOfBlocks = 0; + int32_t numOfBlocks= 0; SArray* pRes = NULL; while (1) { @@ -586,6 +586,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } + pTask->execInfo.inputDataBlocks += numOfBlocks; + pTask->execInfo.inputDataSize += blockSize; + // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { @@ -601,11 +604,21 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); + int64_t st = taosGetTimestampMs(); + // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); + + double el = (taosGetTimestampMs() - st) / 1000.0; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + } else { + pTask->execInfo.procsThroughput = (blockSize / el); + } + continue; } } @@ -639,13 +652,23 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); - int64_t resSize = 0; + int64_t totalSize = 0; int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); + streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(resSize), totalBlocks); + SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); + } SCheckpointInfo* pInfo = &pTask->chkInfo; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a6453871c8..7f1128b929 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1017,8 +1017,13 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, + .checkpointInfo.latestSize = 0, + .checkpointInfo.remoteBackup = 0, .hTaskId = (*pTask)->hTaskInfo.id.taskId, - + .procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize), + .outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize), + .procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput), + .outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput), .startCheckpointId = (*pTask)->execInfo.startCheckpointId, .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, }; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2cb388954d..c056e2a4b6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -771,6 +771,10 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->stage = pSrc->stage; pDst->inputQUsed = pSrc->inputQUsed; pDst->inputRate = pSrc->inputRate; + pDst->procsTotal = pSrc->procsTotal; + pDst->procsThroughput = pSrc->procsThroughput; + pDst->outputTotal = pSrc->outputTotal; + pDst->outputThroughput = pSrc->outputThroughput; pDst->processedVer = pSrc->processedVer; pDst->verRange = pSrc->verRange; pDst->sinkQuota = pSrc->sinkQuota; diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index 5e52b927c6..9b69833234 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -335,6 +335,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->procsTotal) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->procsThroughput) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->outputTotal) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->outputThroughput) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; @@ -346,6 +350,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1; if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; @@ -381,6 +387,10 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.procsTotal) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.procsThroughput) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.outputTotal) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.outputThroughput) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; @@ -393,6 +403,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;