fix: tsdbRetention memory leak
This commit is contained in:
parent
3c18741469
commit
0e46041c1a
|
@ -537,6 +537,9 @@ static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
|
||||||
|
|
||||||
if (task->numWait == 0) {
|
if (task->numWait == 0) {
|
||||||
taosThreadCondDestroy(task->done);
|
taosThreadCondDestroy(task->done);
|
||||||
|
if (task->free) {
|
||||||
|
task->free(task->arg);
|
||||||
|
}
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -546,6 +549,9 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
|
||||||
taosThreadCondBroadcast(task->done);
|
taosThreadCondBroadcast(task->done);
|
||||||
} else {
|
} else {
|
||||||
taosThreadCondDestroy(task->done);
|
taosThreadCondDestroy(task->done);
|
||||||
|
if (task->free) {
|
||||||
|
task->free(task->arg);
|
||||||
|
}
|
||||||
taosMemoryFree(task);
|
taosMemoryFree(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -627,7 +633,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
||||||
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
|
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
|
||||||
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
|
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
|
||||||
|
|
||||||
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, fs->tsdb, NULL);
|
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -774,8 +780,8 @@ static int32_t tsdbFSRunBgTask(void *arg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg,
|
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *),
|
||||||
int64_t *taskid) {
|
void *arg, int64_t *taskid) {
|
||||||
if (fs->stop) {
|
if (fs->stop) {
|
||||||
return 0; // TODO: use a better error code
|
return 0; // TODO: use a better error code
|
||||||
}
|
}
|
||||||
|
@ -798,6 +804,7 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32
|
||||||
|
|
||||||
task->type = type;
|
task->type = type;
|
||||||
task->run = run;
|
task->run = run;
|
||||||
|
task->free = free;
|
||||||
task->arg = arg;
|
task->arg = arg;
|
||||||
task->scheduleTime = taosGetTimestampMs();
|
task->scheduleTime = taosGetTimestampMs();
|
||||||
task->taskid = ++fs->taskid;
|
task->taskid = ++fs->taskid;
|
||||||
|
@ -819,9 +826,10 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid) {
|
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
|
||||||
|
int64_t *taskid) {
|
||||||
taosThreadMutexLock(fs->mutex);
|
taosThreadMutexLock(fs->mutex);
|
||||||
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, arg, taskid);
|
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid);
|
||||||
taosThreadMutexUnlock(fs->mutex);
|
taosThreadMutexUnlock(fs->mutex);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,8 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
|
||||||
int32_t tsdbFSEditCommit(STFileSystem *fs);
|
int32_t tsdbFSEditCommit(STFileSystem *fs);
|
||||||
int32_t tsdbFSEditAbort(STFileSystem *fs);
|
int32_t tsdbFSEditAbort(STFileSystem *fs);
|
||||||
// background task
|
// background task
|
||||||
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid);
|
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
|
||||||
|
int64_t *taskid);
|
||||||
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
|
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
|
||||||
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
|
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
|
||||||
int32_t tsdbFSDisableBgTask(STFileSystem *fs);
|
int32_t tsdbFSDisableBgTask(STFileSystem *fs);
|
||||||
|
@ -70,6 +71,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||||
struct STFSBgTask {
|
struct STFSBgTask {
|
||||||
EFSBgTaskT type;
|
EFSBgTaskT type;
|
||||||
int32_t (*run)(void *arg);
|
int32_t (*run)(void *arg);
|
||||||
|
void (*free)(void *arg);
|
||||||
void *arg;
|
void *arg;
|
||||||
|
|
||||||
TdThreadCond done[1];
|
TdThreadCond done[1];
|
||||||
|
|
|
@ -251,7 +251,6 @@ _exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
|
||||||
}
|
}
|
||||||
taosMemoryFree(arg);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,9 +261,8 @@ int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) {
|
||||||
arg->tsdb = tsdb;
|
arg->tsdb = tsdb;
|
||||||
arg->now = now;
|
arg->now = now;
|
||||||
|
|
||||||
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid);
|
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, taosMemoryFree, arg, taskid);
|
||||||
if (code) taosMemoryFree(arg);
|
if (code) taosMemoryFree(arg);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue