refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-05-31 22:12:14 +08:00
parent ccc86f9916
commit 91de00597d
1 changed files with 22 additions and 14 deletions

View File

@ -17,7 +17,7 @@
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 8
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo (SStreamTask* pTask);
@ -44,6 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
@ -314,8 +315,13 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0;
const char* id = pTask->id.idStr;
while (1) {
int32_t batchSize = 1;
int16_t times = 0;
@ -323,7 +329,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
qDebug("s-task:%s start to extract data block from inputQ", id);
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
@ -338,7 +344,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(1);
taosMsleep(10);
qDebug("===stream===try again batchSize:%d", batchSize);
continue;
}
@ -363,8 +369,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
batchSize++;
pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
qDebug("s-task:%s maximum batch limit:%d reached, processing this batch of blocks", id,
MAX_STREAM_EXEC_BATCH_NUM);
break;
}
}
@ -375,7 +383,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pInput) {
streamFreeQitem(pInput);
}
return 0;
}
@ -385,7 +392,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}
@ -394,16 +401,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
taosMsleep(100);
} else {
break;
}
}
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
{
// set input
@ -417,21 +424,21 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qDebug("s-task:%s %p set submit input (merged), batch num:%d", id, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
@ -446,7 +453,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}