diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5f85abb6b6..c882b719bc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,7 +16,7 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId); @@ -104,8 +104,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); numOfBlocks += 1; - qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId, - pRetrieveBlock->reqId); + qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64 " dump results", pTask->id.idStr, + pTask->info.selfChildId, pRetrieveBlock->reqId); } break; @@ -329,7 +329,8 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, + pStreamTask->id.idStr); // todo handle stream task is dropped here @@ -355,7 +356,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { @@ -385,13 +386,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 0; + int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); + /*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { @@ -457,7 +458,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { - const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput; + const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); } else { ASSERT(0); @@ -468,9 +469,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t totalBlocks = 0; streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", - id, el, resSize / 1048576.0, totalBlocks); + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + resSize / 1048576.0, totalBlocks); // update the currentVer if processing the submit blocks. if(currentVer > pTask->chkInfo.currentVer) { @@ -488,7 +489,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__CK_READY; return 0; } - } } @@ -525,19 +525,20 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { // check for all tasks, and do generate the vnode-wide checkpoint data. // todo extract method SStreamMeta* pMeta = pTask->pMeta; - int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); + int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%"PRId64, pMeta->vgId); + qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId, + pTask->checkpointingId); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) {