fix(stream): add clear msgData buf impl.

This commit is contained in:
Haojun Liao 2024-03-04 19:51:01 +08:00
parent ad1780dbdc
commit a38161299b
1 changed files with 15 additions and 7 deletions

View File

@ -315,6 +315,16 @@ int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
}
void clearBufferedDispatchMsg(SStreamTask* pTask) {
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
if (pMsgInfo->pData != NULL) {
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
}
pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0;
}
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
@ -678,8 +688,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
// todo deal with only partially success dispatch case
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
clearBufferedDispatchMsg(pTask);
return code;
}
@ -938,15 +947,12 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
pTask->chkInfo.dispatchCheckpointTrigger = true;
}
pTask->msgInfo.pData = NULL;
pTask->msgInfo.dispatchMsgType = 0;
clearBufferedDispatchMsg(pTask);
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
@ -1086,7 +1092,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0;
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
@ -1095,6 +1101,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
clearBufferedDispatchMsg(pTask);
// now ready for next data output
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
} else {