fix(stream): fix memory leak and failed to close vnode.

This commit is contained in:
Haojun Liao 2023-06-16 09:55:11 +08:00
parent a3e1882fb2
commit 933801269a
3 changed files with 21 additions and 15 deletions

View File

@ -154,18 +154,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
static void* freeStreamTasks(SArray* pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
@ -175,8 +167,20 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
taosArrayDestroy(pStream->pHTasksList);
return taosArrayDestroy(pTaskLevel);
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
pStream->tasks = freeStreamTasks(pStream->tasks);
pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {

View File

@ -1077,7 +1077,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// wait for the stream task get ready for scan history data
while (pStreamTask->status.checkDownstream == 0 || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
pStreamTask->info.taskLevel);
taosMsleep(100);

View File

@ -3963,6 +3963,7 @@ void destroyStreamStateOperatorInfo(void* param) {
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeDeleted);
taosMemoryFreeClear(param);
}