fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-07-26 14:37:04 +08:00
parent 12986ff594
commit cdffabcdad
6 changed files with 12 additions and 33 deletions

View File

@ -186,9 +186,6 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
} }
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} }

View File

@ -1047,6 +1047,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock); taosWUnLockLatch(&pStreamMeta->lock);
return -1; return -1;
} }
@ -1261,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t remain = streamAlignTransferState(pTask); int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) { if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
@ -1475,9 +1477,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
} }
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }

View File

@ -408,14 +408,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
pTask->status.taskStatus = TASK_STATUS__DROPPING;
// free it and remove it from disk meta-store // free it and remove it from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.taskId); streamMetaUnregisterTask(pMeta, taskId);
streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
// save to disk // save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);

View File

@ -226,10 +226,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn); int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
taosWUnLockLatch(&pMeta->lock);
if (code != 0) { if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno)); qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
} else { } else {
@ -248,6 +245,8 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return -1; return -1;
@ -257,8 +256,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else { } else {
return 0; return 0;
} }
@ -361,13 +358,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
doRemoveIdFromList(pMeta, num, pTask->id.taskId); doRemoveIdFromList(pMeta, num, pTask->id.taskId);
// remove the ref by timer // remove the ref by timer
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
taosTmrStop(pTask->schedTimer); taosTmrStop(pTask->schedTimer);
} }
streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
} else { } else {
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
} }

View File

@ -211,7 +211,7 @@ static void freeItem(void* p) {
} }
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr); qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) { if (pTask->inputQueue) {

View File

@ -1,11 +1,9 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/cfg.sh -n dnode1 -c keepColumnName -v 1 system sh/cfg.sh -n dnode1 -c keepColumnName -v 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 5000 sleep 1000
sql connect sql connect
print ========== interval\session\state window print ========== interval\session\state window
@ -32,7 +30,6 @@ sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*),
sql alter local 'keepColumnName' '1' sql alter local 'keepColumnName' '1'
sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32)); sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32));
sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);
@ -58,17 +55,13 @@ sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count
sql alter local 'keepColumnName' '0' sql alter local 'keepColumnName' '0'
sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);
sql desc realtime_meters; sql desc realtime_meters;
if $rows == 0 then if $rows == 0 then
return -1 return -1
endi endi
sql create stream streamd7 into streamt7 as select _wstart, _wend, count(*), first(ca), last(ca) from t1 interval(10s); sql create stream streamd7 into streamt7 as select _wstart t1, _wend t2, count(*), first(ca), last(ca) from t1 interval(10s);
sql desc streamt7; sql desc streamt7;
if $rows == 0 then if $rows == 0 then
return -1 return -1
endi endi
@ -76,12 +69,11 @@ endi
sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s); sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s);
sql desc streamt71; sql desc streamt71;
if $rows == 0 then if $rows == 0 then
return -1 return -1
endi endi
sleep 3000 sleep 1000
sql drop stream if exists streamd1; sql drop stream if exists streamd1;
sql drop stream if exists streamd2; sql drop stream if exists streamd2;
@ -93,23 +85,19 @@ sql drop stream if exists streamd6;
sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s); sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd10; sql desc streamd10;
if $rows == 0 then if $rows == 0 then
return -1 return -1
endi endi
sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s); sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s);
sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s); sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd12; sql desc streamd12;
if $rows == 0 then if $rows == 0 then
return -1 return -1
endi endi
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check