other: add some logs.

This commit is contained in:
Haojun Liao 2023-05-19 19:26:15 +08:00
parent e2ec8d738c
commit a9b7b8a5fd
2 changed files with 14 additions and 7 deletions

View File

@ -314,13 +314,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1;
}
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total);
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {

View File

@ -29,9 +29,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE);
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes, int64_t* resSize) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
*resSize = 0;
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
@ -122,9 +123,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId;
(*resSize) += blockDataGetSize(output);
taosArrayPush(pRes, &block);
qDebug("s-task:%s (child %d) executed and get block, total blocks:%d", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes));
qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes),
(*resSize)/1048576.0);
}
return 0;
@ -330,10 +334,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
continue;
}
int64_t resSize = 0;
int64_t st = taosGetTimestampMs();
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes);
streamTaskExecImpl(pTask, pInput, pRes, &resSize);
int64_t ckId = 0;
int64_t dataVer = 0;
@ -356,10 +362,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
}
} else {
qDebug("s-task:%s exec end", pTask->id.idStr);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB", pTask->id.idStr, el, resSize/1048576.0);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {