diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 86e79835ac..d36231c018 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -313,7 +313,8 @@ typedef struct STaskSchedInfo { } STaskSchedInfo; typedef struct SSinkTaskRecorder { - int64_t numOfPackages; + int64_t numOfSubmit; + int64_t numOfBlocks; int64_t numOfRows; } SSinkTaskRecorder; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6347b9cadd..45991d83c8 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -272,7 +272,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { continue; } else { hasSubmit = true; - pTask->sinkRecorder.numOfPackages += 1; + pTask->sinkRecorder.numOfBlocks += 1; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData); @@ -300,10 +300,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } - if ((pTask->sinkRecorder.numOfPackages % 5000) == 0) { + pTask->sinkRecorder.numOfSubmit += 1; + + if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) into dst table, duration:%.2fSec.", - pTask->id.idStr, vgId, pRec->numOfPackages, pRec->numOfRows, + tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 + " submit into dst table, duration:%.2f Sec.", + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0); } } else {