diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1a5ef2e007..128d26bfb1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -314,13 +314,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6391a02ace..2d950723bc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -29,9 +29,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes, int64_t* resSize) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; + *resSize = 0; while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); @@ -122,9 +123,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; + + (*resSize) += blockDataGetSize(output); taosArrayPush(pRes, &block); - qDebug("s-task:%s (child %d) executed and get block, total blocks:%d", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes)); + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes), + (*resSize)/1048576.0); } return 0; @@ -330,10 +334,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { continue; } + int64_t resSize = 0; + int64_t st = taosGetTimestampMs(); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - streamTaskExecImpl(pTask, pInput, pRes); + streamTaskExecImpl(pTask, pInput, pRes, &resSize); int64_t ckId = 0; int64_t dataVer = 0; @@ -356,10 +362,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { taosWUnLockLatch(&pTask->pMeta->lock); qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); } - } else { - qDebug("s-task:%s exec end", pTask->id.idStr); } + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB", pTask->id.idStr, el, resSize/1048576.0); + if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) {