From 1a08ffc79f3a8f58c75b60cde50111bc8722ed42 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 19:27:09 +0800 Subject: [PATCH 1/4] fix(stream): check error. --- source/libs/stream/src/streamDispatch.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 557b92baf9..8de034d1ca 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -730,10 +730,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. // todo handle the shuffle dispatch failure - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, - tstrerror(code), ++pTask->msgInfo.retryCount); - int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (ret != TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore + qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id); + } else { + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, + tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + } } return TSDB_CODE_SUCCESS; From a334547726a5a85a8092119773dd2f5e24a5eeab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 19:28:36 +0800 Subject: [PATCH 2/4] fix(stream): fix an syntax error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8de034d1ca..12ff064963 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -731,7 +731,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // happened too fast. // todo handle the shuffle dispatch failure if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore - qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id); + qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id, pRsp->downstreamTaskId); } else { qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); From 0acb00c5165de1b2031870b02a3085829bb9a07a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 15 Aug 2023 11:48:56 +0800 Subject: [PATCH 3/4] fix(stream): add log. --- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamDispatch.c | 3 ++- source/libs/stream/src/streamExec.c | 12 +++--------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e9b38dfff2..af67490888 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -378,7 +378,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); + qDebug("s-task:%s checkpoint/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 12ff064963..bb32173404 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -364,7 +364,8 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->msgInfo.msgType; - qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg, len:%d", pTask->id.idStr, pReq->taskId, vgId, + msg.contLen); return tmsgSendReq(pEpSet, &msg); FAIL: diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b513d7c13e..6e6f23be01 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -421,17 +421,12 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(10); - qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); - continue; - } - qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; } - qDebug("s-task:%s sink task handle result block one-by-one", id); + qDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type); + *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; @@ -467,8 +462,7 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI return TSDB_CODE_SUCCESS; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block - qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id, - *numOfBlocks); + qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } From 71c3c710e6db673658a5ead9b455166e28a8a5ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 15 Aug 2023 14:23:20 +0800 Subject: [PATCH 4/4] fix(stream): continue process when met with trans-state msg. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6e6f23be01..02bbce6485 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -575,7 +575,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput->type == STREAM_INPUT__TRANS_STATE) { streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); - return 0; + continue; } if (pTask->info.taskLevel == TASK_LEVEL__SINK) {