more refact
This commit is contained in:
parent
0e46041c1a
commit
1b39147496
|
@ -125,6 +125,7 @@ _exit:
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *tsdb;
|
STsdb *tsdb;
|
||||||
|
int32_t sync;
|
||||||
int64_t now;
|
int64_t now;
|
||||||
} SRtnArg;
|
} SRtnArg;
|
||||||
|
|
||||||
|
@ -254,23 +255,30 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) {
|
static void tsdbFreeRtnArg(void *arg) {
|
||||||
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
SRtnArg *rArg = (SRtnArg *)arg;
|
||||||
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (rArg->sync) {
|
||||||
|
tsem_post(&rArg->tsdb->pVnode->canCommit);
|
||||||
arg->tsdb = tsdb;
|
}
|
||||||
arg->now = now;
|
taosMemoryFree(arg);
|
||||||
|
|
||||||
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, taosMemoryFree, arg, taskid);
|
|
||||||
if (code) taosMemoryFree(arg);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) {
|
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
|
||||||
|
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
|
||||||
|
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
arg->tsdb = tsdb;
|
||||||
|
arg->sync = sync;
|
||||||
|
arg->now = now;
|
||||||
|
|
||||||
|
if (sync) {
|
||||||
|
tsem_wait(&tsdb->pVnode->canCommit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t taskid;
|
int64_t taskid;
|
||||||
|
int32_t code =
|
||||||
int32_t code = tsdbAsyncRetention(tsdb, now, &taskid);
|
tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid);
|
||||||
if (code) return code;
|
if (code) {
|
||||||
|
tsdbFreeRtnArg(arg);
|
||||||
return tsdbFSWaitBgTask(tsdb->pFS, taskid);
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
|
@ -15,27 +15,8 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
extern int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now);
|
extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
|
||||||
extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid);
|
|
||||||
|
|
||||||
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
|
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
|
||||||
int32_t code;
|
return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
|
||||||
int32_t lino;
|
|
||||||
|
|
||||||
if (pVnode->config.sttTrigger == 1) {
|
|
||||||
tsem_wait(&pVnode->canCommit);
|
|
||||||
code = tsdbSyncRetention(pVnode->pTsdb, now);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// code = smaDoRetention(pVnode->pSma, now);
|
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
tsem_post(&pVnode->canCommit);
|
|
||||||
} else {
|
|
||||||
int64_t taskid;
|
|
||||||
code = tsdbAsyncRetention(pVnode->pTsdb, now, &taskid);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue