adj stream op result

This commit is contained in:
54liuyao 2024-07-19 14:50:31 +08:00
parent 5b293e7340
commit 6a71994570
4 changed files with 17 additions and 17 deletions

View File

@ -55,13 +55,13 @@ void destroyStreamCountAggOperatorInfo(void* param) {
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted); tSimpleHashCleanup(pInfo->pStDeleted);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
pInfo->pCheckpointRes = blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted); tSimpleHashCleanup(pInfo->pPkDeleted);

View File

@ -60,14 +60,14 @@ void destroyStreamEventOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pChildren); taosArrayDestroy(pInfo->pChildren);
} }
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pAllUpdated); tSimpleHashCleanup(pInfo->pAllUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted); tSimpleHashCleanup(pInfo->pSeDeleted);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
pInfo->pDelRes = blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted); tSimpleHashCleanup(pInfo->pPkDeleted);

View File

@ -123,8 +123,8 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
static void destroyStreamFillOperatorInfo(void* param) { static void destroyStreamFillOperatorInfo(void* param) {
SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param; SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
pInfo->pFillInfo = destroyStreamFillInfo(pInfo->pFillInfo); destroyStreamFillInfo(pInfo->pFillInfo);
pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup); destroyStreamFillSupporter(pInfo->pFillSup);
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL; pInfo->pRes = NULL;
blockDataDestroy(pInfo->pSrcBlock); blockDataDestroy(pInfo->pSrcBlock);

View File

@ -465,11 +465,11 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosHashCleanup(pInfo->pPullDataMap); taosHashCleanup(pInfo->pPullDataMap);
taosHashCleanup(pInfo->pFinalPullDataMap); taosHashCleanup(pInfo->pFinalPullDataMap);
taosArrayDestroy(pInfo->pPullWins); taosArrayDestroy(pInfo->pPullWins);
pInfo->pPullDataRes = blockDataDestroy(pInfo->pPullDataRes); blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pDelWins); taosArrayDestroy(pInfo->pDelWins);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
pInfo->pMidRetriveRes = blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidRetriveRes);
pInfo->pMidPulloverRes = blockDataDestroy(pInfo->pMidPulloverRes); blockDataDestroy(pInfo->pMidPulloverRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
taosArrayDestroy(pInfo->pMidPullDatas); taosArrayDestroy(pInfo->pMidPullDatas);
@ -486,7 +486,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
tSimpleHashCleanup(pInfo->pDeletedMap); tSimpleHashCleanup(pInfo->pDeletedMap);
pInfo->pCheckpointRes = blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -1943,7 +1943,7 @@ _error:
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
tSimpleHashCleanup(pSup->pResultRows); tSimpleHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf); destroyDiskbasedBuf(pSup->pResultBuf);
pSup->pScanBlock = blockDataDestroy(pSup->pScanBlock); blockDataDestroy(pSup->pScanBlock);
pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState); pSup->stateStore.streamFileStateDestroy(pSup->pState->pFileState);
taosMemoryFreeClear(pSup->pState); taosMemoryFreeClear(pSup->pState);
taosMemoryFreeClear(pSup->pDummyCtx); taosMemoryFreeClear(pSup->pDummyCtx);
@ -1968,14 +1968,14 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
} }
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
pInfo->pWinBlock = blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pWinBlock);
tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted); tSimpleHashCleanup(pInfo->pStDeleted);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
pInfo->pCheckpointRes = blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted); tSimpleHashCleanup(pInfo->pPkDeleted);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
@ -3946,13 +3946,13 @@ void destroyStreamStateOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pChildren); taosArrayDestroy(pInfo->pChildren);
} }
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted); tSimpleHashCleanup(pInfo->pSeDeleted);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
pInfo->pCheckpointRes = blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
tSimpleHashCleanup(pInfo->pPkDeleted); tSimpleHashCleanup(pInfo->pPkDeleted);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);