enh(stream): execute scan history by using time slice 1s.

This commit is contained in:
Haojun Liao 2023-11-05 01:26:53 +08:00
parent 1f756f58f6
commit 0e44950a37
4 changed files with 38 additions and 17 deletions

View File

@ -213,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo);

View File

@ -775,10 +775,16 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamScanHistoryData(SStreamTask* pTask); EScanHistoryRet streamScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level // agg level

View File

@ -1175,6 +1175,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (ret == TASK_SCANHISTORY_REXEC) { if (ret == TASK_SCANHISTORY_REXEC) {
// todo wait for 100ms and retry
streamStartScanHistoryAsync(pTask, 0); streamStartScanHistoryAsync(pTask, 0);
} else { } else {
char* p = NULL; char* p = NULL;

View File

@ -18,7 +18,8 @@
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define STREAM_RESULT_DUMP_THRESHOLD 300 #define STREAM_RESULT_DUMP_THRESHOLD 300
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
} }
streamDispatchStreamBlock(pTask); streamDispatchStreamBlock(pTask);
return code;
} }
return 0; return code;
} }
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
@ -187,12 +187,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code; return code;
} }
int32_t streamScanHistoryData(SStreamTask* pTask) { EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
int64_t st = taosGetTimestampMs();
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
@ -200,13 +201,14 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
break; return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, tstrerror(terrno));
continue;
} }
int32_t size = 0; int32_t size = 0;
@ -214,26 +216,26 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
while (1) { while (1) {
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0; return TASK_SCANHISTORY_QUIT;
} }
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel);
taosMsleep(10000); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
continue; return TASK_SCANHISTORY_REXEC;
} }
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
code = qExecTask(exec, &output, &ts); code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); stError("s-task:%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue; continue;
} }
// the generated results before fill-history task been paused, should be dispatched to sink node // the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) { if (output == NULL) {
finished = qStreamRecoverScanFinished(exec); finished = qStreamScanhistoryFinished(exec);
break; break;
} }
@ -243,8 +245,9 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
@ -256,14 +259,25 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks); code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; terrno = code;
stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code));
return TASK_SCANHISTORY_REXEC;
} }
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
} }
int64_t el = taosGetTimestampMs() - st;
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr,
pTask->info.fillHistory, pTask->info.taskLevel);
// todo exec scanhistory in 100ms
return TASK_SCANHISTORY_REXEC;
}
} }
return 0; return TASK_SCANHISTORY_CONT;
} }
// wait for the stream task to be idle // wait for the stream task to be idle