first version

This commit is contained in:
Hongze Cheng 2023-11-15 13:41:00 +08:00
parent 9167a07e89
commit 8050d73202
8 changed files with 93 additions and 47 deletions

View File

@ -90,6 +90,7 @@ IF (TD_VNODE_PLUGINS)
vnode vnode
PRIVATE PRIVATE
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompactMonitor.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
) )
ENDIF () ENDIF ()

View File

@ -74,6 +74,7 @@ typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SBlkInfo SBlkInfo; typedef struct SBlkInfo SBlkInfo;
typedef struct STsdbDataIter2 STsdbDataIter2; typedef struct STsdbDataIter2 STsdbDataIter2;
typedef struct STsdbFilterInfo STsdbFilterInfo; typedef struct STsdbFilterInfo STsdbFilterInfo;
typedef struct STFileSystem STFileSystem;
#define TSDBROW_ROW_FMT ((int8_t)0x0) #define TSDBROW_ROW_FMT ((int8_t)0x0)
#define TSDBROW_COL_FMT ((int8_t)0x1) #define TSDBROW_COL_FMT ((int8_t)0x1)
@ -384,8 +385,10 @@ struct STsdb {
TdThreadMutex bMutex; TdThreadMutex bMutex;
SLRUCache *pgCache; SLRUCache *pgCache;
TdThreadMutex pgMutex; TdThreadMutex pgMutex;
struct STFileSystem *pFS; // new STFileSystem *pFS; // new
SRocksCache rCache; SRocksCache rCache;
// compact monitor
struct SCompMonitor *pCompMonitor;
}; };
struct TSDBKEY { struct TSDBKEY {

View File

@ -1257,6 +1257,31 @@ int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int
return 0; return 0;
} }
/* Try stop the bg task. If task is `stoped`, return true, else `false`
* The caller must hold the fs->tsdb->mutex
*/
bool tsdbFSStopBgTask(STFileSystem *fs, int32_t fid, int64_t taskId) {
STFileSet *fset = NULL;
tsdbFSGetFSet(fs, fid, &fset);
ASSERT(fset);
if (fset->bgTaskRunning->taskid == taskId) {
return false;
} else {
for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) {
if (task->taskid == taskId) {
// stop and remove current task
task->next->prev = task->prev;
task->prev->next = task->next;
fset->bgTaskNum--;
tsdbDoDoneBgTask(fs, task);
return true;
}
}
return false;
}
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) { int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(&fs->tsdb->mutex); taosThreadMutexLock(&fs->tsdb->mutex);
for (;;) { for (;;) {

View File

@ -58,6 +58,7 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task // background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid); void (*destroy)(void *), void *arg, int64_t *taskid);
bool tsdbFSStopBgTask(STFileSystem *fs, int32_t fid, int64_t taskId);
int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other // other

View File

@ -28,7 +28,6 @@ typedef struct SSttLvl SSttLvl;
typedef TARRAY2(STFileObj *) TFileObjArray; typedef TARRAY2(STFileObj *) TFileObjArray;
typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray; typedef TARRAY2(STFileOp) TFileOpArray;
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask; typedef struct STFSBgTask STFSBgTask;
typedef enum { typedef enum {

View File

@ -16,6 +16,9 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFS2.h" #include "tsdbFS2.h"
extern int32_t tsdbOpenCompMonitor(STsdb *tsdb);
extern int32_t tsdbCloseCompMonitor(STsdb *tsdb);
int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg; STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg;
pKeepCfg->precision = pCfg->precision; pKeepCfg->precision = pCfg->precision;
@ -81,6 +84,12 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
goto _err; goto _err;
} }
#ifdef TD_ENTERPRISE
if (tsdbOpenCompMonitor(pTsdb) < 0) {
goto _err;
}
#endif
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d, keepTimeoffset:%d", TD_VID(pVnode), pTsdb->path, tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d, keepTimeoffset:%d", TD_VID(pVnode), pTsdb->path,
pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2,
pTsdb->keepCfg.keepTimeOffset); pTsdb->keepCfg.keepTimeOffset);
@ -108,6 +117,9 @@ int tsdbClose(STsdb **pTsdb) {
tsdbCloseFS(&(*pTsdb)->pFS); tsdbCloseFS(&(*pTsdb)->pFS);
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);
#ifdef TD_ENTERPRISE
tsdbCloseCompMonitor(*pTsdb);
#endif
taosThreadMutexDestroy(&(*pTsdb)->mutex); taosThreadMutexDestroy(&(*pTsdb)->mutex);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
} }

View File

@ -14,11 +14,11 @@
*/ */
#include "audit.h" #include "audit.h"
#include "cos.h"
#include "tencode.h" #include "tencode.h"
#include "tmsg.h" #include "tmsg.h"
#include "tstrbuild.h" #include "tstrbuild.h"
#include "vnd.h" #include "vnd.h"
#include "cos.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
@ -2035,6 +2035,11 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
} }
static int32_t vnodeProcessStopCompactReq(SVnode *pVnode) {
// TODO
return 0;
}
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
syncCheckMember(pVnode->sync); syncCheckMember(pVnode->sync);