diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f549f23c7f..d0e2b45937 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -213,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); -bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2e4204ab34..0605f4b2e5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -775,10 +775,16 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +typedef enum { + TASK_SCANHISTORY_CONT = 0x1, + TASK_SCANHISTORY_QUIT = 0x2, + TASK_SCANHISTORY_REXEC = 0x3, +} EScanHistoryRet; + // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -int32_t streamScanHistoryData(SStreamTask* pTask); +EScanHistoryRet streamScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2422841c3b..2e6844736c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1175,6 +1175,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); if (ret == TASK_SCANHISTORY_REXEC) { + // todo wait for 100ms and retry streamStartScanHistoryAsync(pTask, 0); } else { char* p = NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 05a62140f9..fa042b5687 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,7 +18,8 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 #define STREAM_RESULT_DUMP_THRESHOLD 300 -#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) +#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data +#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl } streamDispatchStreamBlock(pTask); - return code; } - return 0; + return code; } static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, @@ -187,12 +187,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -int32_t streamScanHistoryData(SStreamTask* pTask) { +EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; + int64_t st = taosGetTimestampMs(); qSetStreamOpOpen(exec); @@ -200,13 +201,14 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { if (streamTaskShouldPause(pTask)) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - break; + return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2 } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, tstrerror(terrno)); + continue; } int32_t size = 0; @@ -214,26 +216,26 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { while (1) { if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return 0; + return TASK_SCANHISTORY_QUIT; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); - taosMsleep(10000); - continue; + stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return TASK_SCANHISTORY_REXEC; } SSDataBlock* output = NULL; uint64_t ts = 0; code = qExecTask(exec, &output, &ts); if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { - stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); + stError("s-task:%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); continue; } // the generated results before fill-history task been paused, should be dispatched to sink node if (output == NULL) { - finished = qStreamRecoverScanFinished(exec); + finished = qStreamScanhistoryFinished(exec); break; } @@ -243,8 +245,9 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { taosArrayPush(pRes, &block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; - if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); @@ -256,14 +259,25 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { - return code; + terrno = code; + stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); + return TASK_SCANHISTORY_REXEC; } } else { taosArrayDestroy(pRes); } + + int64_t el = taosGetTimestampMs() - st; + if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { + stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, + pTask->info.fillHistory, pTask->info.taskLevel); + + // todo exec scanhistory in 100ms + return TASK_SCANHISTORY_REXEC; + } } - return 0; + return TASK_SCANHISTORY_CONT; } // wait for the stream task to be idle