From 91de00597d64d89b41acb254eb09f30fdf67c874 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 May 2023 22:12:14 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 36 ++++++++++++++++++----------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 716b939e5f..c3dd848bc7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,7 +17,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 8 +#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo (SStreamTask* pTask); @@ -44,6 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* if (numOfBlocks > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { + qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno)); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; } @@ -314,8 +315,13 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +/** + * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the + * appropriate batch of blocks should be handled in 5 to 10 sec. + */ int32_t streamExecForAll(SStreamTask* pTask) { - int32_t code = 0; + const char* id = pTask->id.idStr; + while (1) { int32_t batchSize = 1; int16_t times = 0; @@ -323,7 +329,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); + qDebug("s-task:%s start to extract data block from inputQ", id); while (1) { if (streamTaskShouldPause(&pTask->status)) { @@ -338,7 +344,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { times++; - taosMsleep(1); + taosMsleep(10); qDebug("===stream===try again batchSize:%d", batchSize); continue; } @@ -363,8 +369,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr); + qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id, + MAX_STREAM_EXEC_BATCH_NUM); break; } } @@ -375,7 +383,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput) { streamFreeQitem(pInput); } - return 0; } @@ -385,7 +392,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize); + qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } @@ -394,16 +401,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(2); + taosMsleep(100); } else { break; } } int64_t st = taosGetTimestampMs(); - qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); { // set input @@ -417,21 +424,21 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", id, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; @@ -446,7 +453,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); + qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", + id, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); }