diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a98ad5a4c2..29a75083ef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -326,18 +326,21 @@ typedef struct SSinkRecorder { int64_t numOfSubmit; int64_t numOfBlocks; int64_t numOfRows; - int64_t bytes; + int64_t dataSize; } SSinkRecorder; typedef struct STaskExecStatisInfo { int64_t created; int64_t init; + int64_t start; int64_t step1Start; int64_t step2Start; - int64_t start; int32_t updateCount; - int32_t dispatch; int64_t latestUpdateTs; + int32_t processDataBlocks; + int64_t processDataSize; + int32_t dispatch; + int64_t dispatchDataSize; int32_t checkpoint; SSinkRecorder sink; } STaskExecStatisInfo; @@ -415,7 +418,6 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; int64_t stage; -// bool leader; int32_t role; STaskStartInfo startInfo; SRWLatch lock; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a5958197bd..8009eccb1b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -272,11 +272,12 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* SSinkRecorder* pRec = &pTask->execInfo.sink; pRec->numOfSubmit += 1; - if ((pRec->numOfSubmit % 5000) == 0) { + if ((pRec->numOfSubmit % 1000) == 0) { double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), + el); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a87bb00972..e46b094f60 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -556,7 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { int32_t blockSize = streamQueueItemGetSize(pInput); - pTask->execInfo.sink.bytes += blockSize; + pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index abf10487de..61453cb54e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -154,6 +154,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t taskLevel = pTask->info.taskLevel; *numOfBlocks = 0; + // no available token in bucket for sink task, let's wait + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) { + stDebug("s-task:%s no available token in bucket for sink data, wait", id); + return TSDB_CODE_SUCCESS; + } + while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);