refact 1
This commit is contained in:
parent
c1fef8c445
commit
cb585812f1
|
@ -160,13 +160,10 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
msgCb.qsizeFp = vmGetQueueSize;
|
msgCb.qsizeFp = vmGetQueueSize;
|
||||||
|
|
||||||
vnodeCfg.msgCb = msgCb;
|
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
|
||||||
vnodeCfg.pTfs = pMgmt->pTfs;
|
|
||||||
vnodeCfg.dbId = wrapperCfg.dbUid;
|
|
||||||
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
|
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
|
||||||
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
||||||
|
tFreeSCreateVnodeReq(&createReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,6 +119,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
SVnodeThread *pThread = param;
|
SVnodeThread *pThread = param;
|
||||||
SVnodesMgmt *pMgmt = pThread->pMgmt;
|
SVnodesMgmt *pMgmt = pThread->pMgmt;
|
||||||
SDnode *pDnode = pMgmt->pDnode;
|
SDnode *pDnode = pMgmt->pDnode;
|
||||||
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||||
setThreadName("open-vnodes");
|
setThreadName("open-vnodes");
|
||||||
|
@ -137,8 +138,8 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
msgCb.qsizeFp = vmGetQueueSize;
|
msgCb.qsizeFp = vmGetQueueSize;
|
||||||
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||||
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
|
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
|
|
|
@ -46,7 +46,7 @@ int vnodeInit(int nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
|
@ -108,7 +108,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pData = taosMemoryMalloc(size);
|
pData = taosMemoryMalloc(size + 1);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -119,6 +119,8 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pData[size] = '\0';
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
// decode info
|
// decode info
|
||||||
|
|
|
@ -53,36 +53,31 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
|
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
|
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
|
||||||
|
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
|
SVnodeInfo info = {0};
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
int ret;
|
||||||
|
|
||||||
// Set default options
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||||
SVnodeCfg cfg = vnodeCfgDefault;
|
|
||||||
if (pVnodeCfg != NULL) {
|
|
||||||
cfg.vgId = pVnodeCfg->vgId;
|
|
||||||
cfg.msgCb = pVnodeCfg->msgCb;
|
|
||||||
cfg.pTfs = pVnodeCfg->pTfs;
|
|
||||||
cfg.dbId = pVnodeCfg->dbId;
|
|
||||||
cfg.hashBegin = pVnodeCfg->hashBegin;
|
|
||||||
cfg.hashEnd = pVnodeCfg->hashEnd;
|
|
||||||
cfg.hashMethod = pVnodeCfg->hashMethod;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate options
|
// load vnode info
|
||||||
if (vnodeCheckCfg(&cfg) < 0) {
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
// TODO
|
if (ret < 0) {
|
||||||
|
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the handle
|
info.config.pTfs = pTfs;
|
||||||
pVnode = vnodeNew(path, &cfg);
|
info.config.msgCb = msgCb;
|
||||||
|
|
||||||
|
// crate handle
|
||||||
|
pVnode = vnodeNew(dir, &info.config);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMkDir(path);
|
|
||||||
|
|
||||||
// Open the vnode
|
// Open the vnode
|
||||||
if (vnodeOpenImpl(pVnode) < 0) {
|
if (vnodeOpenImpl(pVnode) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
|
Loading…
Reference in New Issue