Merge pull request #18240 from taosdata/fix/TD-20458

fix:memory leak while drop stream
This commit is contained in:
dapan1121 2022-11-18 09:32:28 +08:00 committed by GitHub
commit 56a10167dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 16 deletions

View File

@ -1610,10 +1610,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
destroyStreamFinalIntervalOperatorInfo(pChildOp->info);
taosMemoryFree(pChildOp->pDownstream);
cleanupExprSupp(&pChildOp->exprSupp);
taosMemoryFreeClear(pChildOp);
destroyOperatorInfo(pChildOp);
}
taosArrayDestroy(pInfo->pChildren);
}
@ -3419,10 +3416,9 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo);
taosMemoryFreeClear(pChild);
destroyOperatorInfo(pChild);
}
taosArrayDestroy(pInfo->pChildren);
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
@ -3470,8 +3466,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
}
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType) {
@ -4374,9 +4372,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->operatorType = pPhyNode->type;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
@ -4416,10 +4411,9 @@ void destroyStreamStateOperatorInfo(void* param) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo);
taosMemoryFreeClear(pChild);
destroyOperatorInfo(pChild);
}
taosArrayDestroy(pInfo->pChildren);
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);

View File

@ -710,6 +710,11 @@ sleep 200
sql select * from streamt4;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 0 then
print =====rows=$rows

View File

@ -0,0 +1,55 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sleep 5000
sql connect
print ========== interval\session\state window
sql CREATE DATABASE test1 BUFFER 96 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 STRICT 'off' WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0;
sql use test1;
sql CREATE STABLE st (time TIMESTAMP, ca DOUBLE, cb DOUBLE, cc int) TAGS (ta VARCHAR(10) );
print ========== create table before stream
sql CREATE TABLE t1 using st TAGS ('aaa');
sql CREATE TABLE t2 using st TAGS ('bbb');
sql CREATE TABLE t3 using st TAGS ('ccc');
sql CREATE TABLE t4 using st TAGS ('ddd');
sql create stream streamd1 into streamt1 as select ca, _wstart,_wend, count(*) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear);
sql create stream streamd2 into streamt2 as select tbname, _wstart,_wend, count(*) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname interval(60m) fill(linear);
sql create stream streamd3 into streamt3 as select ca, _wstart,_wend, count(*), max(ca), min(cb), APERCENTILE(cc, 20) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m);
sql create stream streamd4 into streamt4 as select tbname, _wstart,_wend, count(*), max(ca), min(cb), APERCENTILE(cc, 20) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname session(time, 60m);
sql create stream streamd5 into streamt5 as select tbname, _wstart,_wend, count(*), max(ca), min(cb) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by tbname state_window(cc);
sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*), max(ca), min(cb) from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc);
sleep 3000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
sql drop stream if exists streamd5;
sql drop stream if exists streamd6;
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
return -1
endi
if $system_content == $null then
return -1
endi