From 317a7c83c743f312b2d3bf51f7bc87461adffa7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 10:55:45 +0800 Subject: [PATCH 1/5] fix(stream): fix message lost bug during pause stream. --- source/libs/stream/src/streamExec.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 46290c306f..bf6fcaced7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -381,12 +381,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } - if (streamTaskShouldStop(&pTask->status)) { - if (pInput) { - streamFreeQitem(pInput); - } - return 0; - } +// if (streamTaskShouldStop(&pTask->status)) { +// if (pInput) { +// streamFreeQitem(pInput); +// } +// return 0; +// } if (pInput == NULL) { break; From 5d4efe11980f6a954b9752144803c94ff2aa5ad0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 11:46:05 +0800 Subject: [PATCH 2/5] fix(stream): fix error in pause. --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bf6fcaced7..9fddcf9155 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -335,6 +335,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%s", pTask->id.idStr, batchSize); if (batchSize > 1) { break; } else { From f84bfc96fb5f1ffa30de4d17a98d04b09e246849 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 14:30:11 +0800 Subject: [PATCH 3/5] fix(stream): fix error in pause in stream. --- source/libs/stream/src/streamExec.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9fddcf9155..9a53ffa088 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -325,7 +325,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 1; + int32_t batchSize = 0; int16_t times = 0; SStreamQueueItem* pInput = NULL; @@ -335,8 +335,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%s", pTask->id.idStr, batchSize); - if (batchSize > 1) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize); + if (batchSize > 0) { break; } else { return 0; @@ -357,6 +357,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pInput == NULL) { + batchSize += 1; + pInput = qItem; streamQueueProcessSuccess(pTask->inputQueue); if (pTask->taskLevel == TASK_LEVEL__SINK) { @@ -364,18 +366,20 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } else { // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = NULL; - if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { + ASSERT(batchSize >= 1); + + void* newRet = streamMergeQueueItem(pInput, qItem); + if (newRet == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { - batchSize++; + batchSize += 1; + pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, - MAX_STREAM_EXEC_BATCH_NUM); + if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); break; } } @@ -390,6 +394,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // } if (pInput == NULL) { + ASSERT(batchSize == 0); break; } From ceb06635fe37557dac47600e56c31777ae109b10 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 15:12:35 +0800 Subject: [PATCH 4/5] fix(stream): fix error in pause stream. --- source/libs/stream/src/streamExec.c | 121 +++++++++++++--------------- 1 file changed, 57 insertions(+), 64 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a53ffa088..bfbaed7bf6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -317,6 +317,60 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + const char* id) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + continue; + } + + qDebug("===stream===break batchSize:%d", *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + // do not merge blocks for sink node + if (pTask->taskLevel == TASK_LEVEL__SINK) { + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + + if (pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } + pInput = newRet; + } + + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); + + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -326,73 +380,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 0; - int16_t times = 0; - SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - while (1) { - if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize); - if (batchSize > 0) { - break; - } else { - return 0; - } - } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { - times++; - taosMsleep(10); - qDebug("===stream===try again batchSize:%d", batchSize); - continue; - } - - qDebug("===stream===break batchSize:%d", batchSize); - break; - } - - if (pInput == NULL) { - batchSize += 1; - - pInput = qItem; - streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->taskLevel == TASK_LEVEL__SINK) { - break; - } - } else { - // todo we need to sort the data block, instead of just appending into the array list. - ASSERT(batchSize >= 1); - - void* newRet = streamMergeQueueItem(pInput, qItem); - if (newRet == NULL) { - streamQueueProcessFail(pTask->inputQueue); - break; - } else { - batchSize += 1; - - pInput = newRet; - streamQueueProcessSuccess(pTask->inputQueue); - - if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); - break; - } - } - } - } - -// if (streamTaskShouldStop(&pTask->status)) { -// if (pInput) { -// streamFreeQitem(pInput); -// } -// return 0; -// } - + /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); break; @@ -409,8 +402,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, - atomic_load_8(&pTask->status.taskStatus)); + qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); taosMsleep(100); } else { break; @@ -463,6 +455,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, resSize / 1048576.0, totalBlocks); + streamFreeQitem(pInput); } From 9e62b9d0d21a45e9d284302187867af7560266af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 15:33:08 +0800 Subject: [PATCH 5/5] fix(stream): fix error in extract data from inputQ. --- source/libs/stream/src/streamExec.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bfbaed7bf6..847ec3f159 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -347,7 +347,7 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - if (pInput == NULL) { + if (*pInput == NULL) { ASSERT((*numOfBlocks) == 0); *pInput = qItem; } else { @@ -358,7 +358,8 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } - pInput = newRet; + + *pInput = newRet; } *numOfBlocks += 1;