Merge pull request #22439 from taosdata/fix/liaohj
fix(stream): continue process when met with trans-state msg.
This commit is contained in:
commit
cfe2cf0ebd
|
@ -378,7 +378,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
}
|
}
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
qDebug("s-task:%s checkpoint/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
// use the default memory limit, refactor later.
|
// use the default memory limit, refactor later.
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
|
|
@ -364,7 +364,8 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p
|
||||||
msg.pCont = buf;
|
msg.pCont = buf;
|
||||||
msg.msgType = pTask->msgInfo.msgType;
|
msg.msgType = pTask->msgInfo.msgType;
|
||||||
|
|
||||||
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
|
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg, len:%d", pTask->id.idStr, pReq->taskId, vgId,
|
||||||
|
msg.contLen);
|
||||||
return tmsgSendReq(pEpSet, &msg);
|
return tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
@ -730,10 +731,14 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||||
// happened too fast.
|
// happened too fast.
|
||||||
// todo handle the shuffle dispatch failure
|
// todo handle the shuffle dispatch failure
|
||||||
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
tstrerror(code), ++pTask->msgInfo.retryCount);
|
qWarn("s-task:%s failed to dispatch msg to task:0x%x, no retry, since it is destroyed already", id, pRsp->downstreamTaskId);
|
||||||
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
} else {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
|
||||||
|
tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||||
|
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -421,17 +421,12 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
|
||||||
|
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
|
|
||||||
taosMsleep(10);
|
|
||||||
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
|
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s sink task handle result block one-by-one", id);
|
qDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type);
|
||||||
|
|
||||||
*numOfBlocks = 1;
|
*numOfBlocks = 1;
|
||||||
*pInput = qItem;
|
*pInput = qItem;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -467,8 +462,7 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
// previous existed blocks needs to be handle, before handle the checkpoint msg block
|
// previous existed blocks needs to be handle, before handle the checkpoint msg block
|
||||||
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
|
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks);
|
||||||
*numOfBlocks);
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -581,7 +575,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pInput->type == STREAM_INPUT__TRANS_STATE) {
|
if (pInput->type == STREAM_INPUT__TRANS_STATE) {
|
||||||
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
|
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
|
||||||
return 0;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
|
Loading…
Reference in New Issue