From c056e345343f5ea41dc760b6def08608ed4f1fc7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Jul 2023 09:39:58 +0800 Subject: [PATCH] fix compile error --- source/libs/stream/src/streamExec.c | 33 +++++++++++++++-------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b3e5dc2475..ec43cfdfe8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,7 +16,7 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId); @@ -104,8 +104,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); numOfBlocks += 1; - qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId, - pRetrieveBlock->reqId); + qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64 " dump results", pTask->id.idStr, + pTask->info.selfChildId, pRetrieveBlock->reqId); } break; @@ -329,7 +329,8 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, + pStreamTask->id.idStr); // todo handle stream task is dropped here @@ -355,7 +356,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { @@ -385,13 +386,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 0; + int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); + /*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { @@ -451,7 +452,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { - const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput; + const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); } else { ASSERT(0); @@ -462,9 +463,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t totalBlocks = 0; streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", - id, el, resSize / 1048576.0, totalBlocks); + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + resSize / 1048576.0, totalBlocks); int32_t type = pInput->type; streamFreeQitem(pInput); @@ -475,7 +476,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__CK_READY; return 0; } - } } @@ -512,19 +512,20 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { // check for all tasks, and do generate the vnode-wide checkpoint data. // todo extract method SStreamMeta* pMeta = pTask->pMeta; - int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); + int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId); + qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId, + pTask->checkpointingId); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) {