fix(stream): free array

This commit is contained in:
54liuyao 2022-09-21 14:24:26 +08:00
parent 5a7ad1344b
commit 5ca1dc1005
2 changed files with 6 additions and 35 deletions

View File

@ -1687,6 +1687,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
while (1) { while (1) {
if (pInfo->tqReader->pMsg == NULL) { if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) { if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
return NULL; return NULL;
} }

View File

@ -1695,6 +1695,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
} }
nodesDestroyNode((SNode*)pInfo->pPhyNode); nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -3073,6 +3074,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove(chArray, index); taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) { if (taosArrayGetSize(chArray) == 0) {
// pull data is over // pull data is over
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey)); taosHashRemove(pMap, &winRes, sizeof(SWinKey));
} }
} }
@ -3109,9 +3111,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN; TSKEY maxTs = INT64_MIN;
TSKEY minTs = INT64_MAX; TSKEY minTs = INT64_MAX;
@ -3175,6 +3174,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
} }
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -5755,8 +5757,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
SStreamState* pState = pTaskInfo->streamInfo.pState;
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -5805,36 +5805,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pOperator); pOperator);