fix(stream): reset the pData after transferring state.
This commit is contained in:
parent
80fd3e0445
commit
ad1780dbdc
|
@ -99,6 +99,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
|||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||
void clearBufferedDispatchMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||
|
|
|
@ -412,9 +412,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
pTask->msgInfo.pData = NULL;
|
||||
pTask->msgInfo.dispatchMsgType = 0;
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
|
|
Loading…
Reference in New Issue