Merge pull request #11526 from taosdata/feature/vnode_refact
refactor: vnode
This commit is contained in:
commit
631c9668e7
|
@ -4,13 +4,13 @@ target_sources(
|
||||||
vnode
|
vnode
|
||||||
PRIVATE
|
PRIVATE
|
||||||
# vnode
|
# vnode
|
||||||
|
"src/vnd/vnodeOpen.c"
|
||||||
"src/vnd/vnodeArenaMAImpl.c"
|
"src/vnd/vnodeArenaMAImpl.c"
|
||||||
"src/vnd/vnodeBufferPool.c"
|
"src/vnd/vnodeBufferPool.c"
|
||||||
# "src/vnd/vnodeBufferPool2.c"
|
# "src/vnd/vnodeBufferPool2.c"
|
||||||
"src/vnd/vnodeCfg.c"
|
"src/vnd/vnodeCfg.c"
|
||||||
"src/vnd/vnodeCommit.c"
|
"src/vnd/vnodeCommit.c"
|
||||||
"src/vnd/vnodeInt.c"
|
"src/vnd/vnodeInt.c"
|
||||||
"src/vnd/vnodeMain.c"
|
|
||||||
"src/vnd/vnodeQuery.c"
|
"src/vnd/vnodeQuery.c"
|
||||||
"src/vnd/vnodeStateMgr.c"
|
"src/vnd/vnodeStateMgr.c"
|
||||||
"src/vnd/vnodeWrite.c"
|
"src/vnd/vnodeWrite.c"
|
||||||
|
|
|
@ -42,11 +42,12 @@ typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||||
typedef struct STqCfg STqCfg; // todo: remove
|
typedef struct STqCfg STqCfg; // todo: remove
|
||||||
typedef struct SVnodeCfg SVnodeCfg;
|
typedef struct SVnodeCfg SVnodeCfg;
|
||||||
|
|
||||||
int vnodeInit();
|
int vnodeInit(int nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
|
void vnodeDestroy(const char *path);
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodeDestroy(const char *path);
|
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
|
@ -30,6 +30,8 @@ extern "C" {
|
||||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
// vnodeCfg ====================
|
||||||
|
|
||||||
// vnodeModule ====================
|
// vnodeModule ====================
|
||||||
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
||||||
|
|
||||||
|
@ -38,6 +40,10 @@ int vnodeQueryOpen(SVnode* pVnode);
|
||||||
void vnodeQueryClose(SVnode* pVnode);
|
void vnodeQueryClose(SVnode* pVnode);
|
||||||
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
||||||
|
|
||||||
|
// vnodeCommit ====================
|
||||||
|
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
||||||
|
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
// SVBufPool
|
// SVBufPool
|
||||||
int vnodeOpenBufPool(SVnode* pVnode);
|
int vnodeOpenBufPool(SVnode* pVnode);
|
||||||
|
@ -75,9 +81,9 @@ void vmaFree(SVMemAllocator* pVMA, void* ptr);
|
||||||
bool vmaIsFull(SVMemAllocator* pVMA);
|
bool vmaIsFull(SVMemAllocator* pVMA);
|
||||||
|
|
||||||
// vnodeCfg.h
|
// vnodeCfg.h
|
||||||
extern const SVnodeCfg defaultVnodeOptions;
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
int vnodeValidateOptions(const SVnodeCfg*);
|
int vnodeCheckCfg(const SVnodeCfg*);
|
||||||
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
|
void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc);
|
||||||
|
|
||||||
// For commit
|
// For commit
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct SVnodeInfo SVnodeInfo;
|
||||||
typedef struct SMeta SMeta;
|
typedef struct SMeta SMeta;
|
||||||
typedef struct STsdb STsdb;
|
typedef struct STsdb STsdb;
|
||||||
typedef struct STQ STQ;
|
typedef struct STQ STQ;
|
||||||
|
@ -72,6 +73,11 @@ struct SVState {
|
||||||
int64_t applied;
|
int64_t applied;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SVnodeInfo {
|
||||||
|
SVnodeCfg config;
|
||||||
|
SVState state;
|
||||||
|
};
|
||||||
|
|
||||||
struct SVnode {
|
struct SVnode {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
char* path;
|
char* path;
|
||||||
|
|
|
@ -15,14 +15,15 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
const SVnodeCfg defaultVnodeOptions = {
|
const SVnodeCfg vnodeCfgDefault = {
|
||||||
.wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */
|
.wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}};
|
||||||
|
|
||||||
int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) {
|
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 1 //======================================================================
|
||||||
void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
|
void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
|
||||||
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg));
|
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg));
|
||||||
}
|
}
|
||||||
|
@ -46,3 +47,5 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -15,11 +15,86 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
|
#define VND_INFO_FNAME "vnode.json"
|
||||||
|
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
||||||
|
|
||||||
|
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len);
|
||||||
|
static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo);
|
||||||
static int vnodeStartCommit(SVnode *pVnode);
|
static int vnodeStartCommit(SVnode *pVnode);
|
||||||
static int vnodeEndCommit(SVnode *pVnode);
|
static int vnodeEndCommit(SVnode *pVnode);
|
||||||
static int vnodeCommit(void *arg);
|
static int vnodeCommit(void *arg);
|
||||||
static void vnodeWaitCommit(SVnode *pVnode);
|
static void vnodeWaitCommit(SVnode *pVnode);
|
||||||
|
|
||||||
|
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
||||||
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
TdFilePtr pFile;
|
||||||
|
uint8_t *data;
|
||||||
|
int len;
|
||||||
|
|
||||||
|
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
|
// encode info
|
||||||
|
data = NULL;
|
||||||
|
len = 0;
|
||||||
|
|
||||||
|
if (vnodeEncodeInfo(pInfo, &data, &len) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// save info to a vnode_tmp.json
|
||||||
|
pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
|
if (pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosWriteFile(pFile, data, len) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosFsyncFile(pFile) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
|
// free info binary
|
||||||
|
taosMemoryFree(data);
|
||||||
|
|
||||||
|
vInfo("vgId: %d vnode info is saved, fname: %s", pInfo->config.vgId, fname);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
taosMemoryFree(data);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
|
||||||
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
char tfname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
|
||||||
|
snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
|
||||||
|
|
||||||
|
if (taosRenameFile(tfname, fname) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
vInfo("vgId: %d vnode info is committed", pInfo->config.vgId);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vnodeLoadInfo(const char *dir) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int vnodeAsyncCommit(SVnode *pVnode) {
|
int vnodeAsyncCommit(SVnode *pVnode) {
|
||||||
vnodeWaitCommit(pVnode);
|
vnodeWaitCommit(pVnode);
|
||||||
|
|
||||||
|
@ -61,3 +136,13 @@ static int vnodeEndCommit(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
||||||
|
|
||||||
|
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -20,11 +20,40 @@ static void vnodeFree(SVnode *pVnode);
|
||||||
static int vnodeOpenImpl(SVnode *pVnode);
|
static int vnodeOpenImpl(SVnode *pVnode);
|
||||||
static void vnodeCloseImpl(SVnode *pVnode);
|
static void vnodeCloseImpl(SVnode *pVnode);
|
||||||
|
|
||||||
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
|
SVnodeInfo info = {0};
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
// TODO: check if directory exists
|
||||||
|
|
||||||
|
// check config
|
||||||
|
if (vnodeCheckCfg(pCfg) < 0) {
|
||||||
|
vError("vgId: %d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create vnode env
|
||||||
|
if (tfsMkdir(pTfs, path) < 0) {
|
||||||
|
vError("vgId: %d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||||
|
info.config = *pCfg;
|
||||||
|
|
||||||
|
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
|
||||||
|
vError("vgId: %d failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
|
|
||||||
// Set default options
|
// Set default options
|
||||||
SVnodeCfg cfg = defaultVnodeOptions;
|
SVnodeCfg cfg = vnodeCfgDefault;
|
||||||
if (pVnodeCfg != NULL) {
|
if (pVnodeCfg != NULL) {
|
||||||
cfg.vgId = pVnodeCfg->vgId;
|
cfg.vgId = pVnodeCfg->vgId;
|
||||||
cfg.msgCb = pVnodeCfg->msgCb;
|
cfg.msgCb = pVnodeCfg->msgCb;
|
||||||
|
@ -36,7 +65,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate options
|
// Validate options
|
||||||
if (vnodeValidateOptions(&cfg) < 0) {
|
if (vnodeCheckCfg(&cfg) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
Loading…
Reference in New Issue