diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 910d727933..278c19bdf7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -136,7 +136,7 @@ typedef struct { typedef struct { int8_t type; SSDataBlock* pBlock; -} SStreamTrigger, SStreamCheckpoint; +} SStreamTrigger; typedef struct SStreamQueueNode SStreamQueueNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 927c85b262..d86b49c8f4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2189,7 +2189,7 @@ FETCH_NEXT_BLOCK: qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); - SSDataBlock* pBlock = pData->pDataBlock; + SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); if (pBlock->info.type == STREAM_CHECKPOINT) { streamScanOperatorSaveCheckpoint(pInfo); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index dbf946d310..dbec866705 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -328,6 +328,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } + pTask->status.transferState = false; // reset this value, to avoid transfer state again + ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -415,8 +417,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; - qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, pItem->type); + const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type); } else { ASSERT(0); @@ -431,16 +433,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 0; + int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - extractBlocksFromInputQ(pTask, &pInput, &batchSize); + extractBlocksFromInputQ(pTask, &pInput, &numOfBlocks); if (pInput == NULL) { - ASSERT(batchSize == 0); + ASSERT(numOfBlocks == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); - pTask->status.transferState = false; // reset this value, to avoid transfer state again if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } @@ -462,7 +463,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); if (type == STREAM_INPUT__DATA_BLOCK) { - qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); + qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } @@ -471,10 +472,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); const SStreamQueueItem* pItem = pInput; - qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type); + qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); int64_t ver = pTask->chkInfo.checkpointVer; - doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.checkpointVer, id); + doSetStreamInputBlock(pTask, pInput, &ver, id); int64_t resSize = 0; int32_t totalBlocks = 0; @@ -485,11 +486,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { resSize / 1048576.0, totalBlocks); // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer); + ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, - pTask->chkInfo.checkpointVer); + qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, + pTask->chkInfo.checkpointVer, ver); + pTask->chkInfo.checkpointVer = ver; } streamFreeQitem(pInput); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index b3dbf827a5..8bb0e3b506 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -174,7 +174,7 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one while (1) { - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); return TSDB_CODE_SUCCESS; } @@ -200,7 +200,7 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i // non sink task while (1) { - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); return TSDB_CODE_SUCCESS; }