Merge pull request #1529 from taosdata/refact/dnodemgmt
optimize the code in dnodeMgmt, so it can close vnode one by one when…
This commit is contained in:
commit
23c04ba290
|
@ -32,6 +32,7 @@
|
|||
#include "vnode.h"
|
||||
|
||||
static int32_t dnodeOpenVnodes();
|
||||
static void dnodeCloseVnodes();
|
||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||
|
@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ( vnodeInitModule() != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = dnodeOpenVnodes();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
|
@ -88,7 +85,7 @@ void dnodeCleanupMgmt() {
|
|||
tsDnodeTmr = NULL;
|
||||
}
|
||||
|
||||
vnodeCleanupModule();
|
||||
dnodeCloseVnodes();
|
||||
}
|
||||
|
||||
void dnodeMgmt(SRpcMsg *pMsg) {
|
||||
|
@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
|
|||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
static int32_t dnodeOpenVnodes() {
|
||||
static int dnodeGetVnodeList(int32_t vnodeList[]) {
|
||||
DIR *dir = opendir(tsVnodeDir);
|
||||
if (dir == NULL) {
|
||||
return TSDB_CODE_NO_WRITE_ACCESS;
|
||||
|
@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() {
|
|||
int32_t vnode = atoi(de->d_name + 5);
|
||||
if (vnode == 0) continue;
|
||||
|
||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
|
||||
int32_t code = vnodeOpen(vnode, vnodeDir);
|
||||
if (code == 0) {
|
||||
numOfVnodes++;
|
||||
}
|
||||
vnodeList[numOfVnodes] = vnode;
|
||||
numOfVnodes++;
|
||||
}
|
||||
}
|
||||
closedir(dir);
|
||||
|
||||
dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return numOfVnodes;
|
||||
}
|
||||
|
||||
static int32_t dnodeOpenVnodes() {
|
||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||
int failed = 0;
|
||||
|
||||
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
||||
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||
|
||||
for (int i=0; i<numOfVnodes; ++i) {
|
||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
|
||||
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
|
||||
}
|
||||
|
||||
free(vnodeList);
|
||||
|
||||
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void dnodeCloseVnodes() {
|
||||
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
|
||||
int numOfVnodes = dnodeGetVnodeList(vnodeList);
|
||||
|
||||
for (int i=0; i<numOfVnodes; ++i)
|
||||
vnodeClose(vnodeList[i]);
|
||||
|
||||
free(vnodeList);
|
||||
dPrint("total vnodes:%d are all closed", numOfVnodes);
|
||||
}
|
||||
|
||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
|
|
|
@ -25,13 +25,10 @@ typedef struct {
|
|||
void *rsp;
|
||||
} SRspRet;
|
||||
|
||||
int32_t vnodeInitModule();
|
||||
void vnodeCleanupModule();
|
||||
|
||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vnode, char *rootDir);
|
||||
int32_t vnodeClose(void *pVnode);
|
||||
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
void vnodeRelease(void *pVnode);
|
||||
void* vnodeGetVnode(int32_t vgId);
|
||||
|
|
|
@ -828,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
|||
if (pConn->inType) {
|
||||
// if there are pending request, notify the app
|
||||
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
|
||||
/*
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.pCont = NULL;
|
||||
rpcMsg.contLen = 0;
|
||||
rpcMsg.handle = pConn;
|
||||
rpcMsg.msgType = pConn->inType;
|
||||
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||
// (*(pRpc->cfp))(&rpcMsg);
|
||||
(*(pRpc->cfp))(&rpcMsg);
|
||||
*/
|
||||
}
|
||||
|
||||
rpcCloseConn(pConn);
|
||||
|
@ -1157,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
|
|||
if (pConn->inType && pRpc->cfp) {
|
||||
// if there are pending request, notify the app
|
||||
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
|
||||
/*
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.pCont = NULL;
|
||||
rpcMsg.contLen = 0;
|
||||
rpcMsg.handle = pConn;
|
||||
rpcMsg.msgType = pConn->inType;
|
||||
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||
// (*(pRpc->cfp))(&rpcMsg);
|
||||
(*(pRpc->cfp))(&rpcMsg);
|
||||
*/
|
||||
}
|
||||
rpcCloseConn(pConn);
|
||||
} else {
|
||||
|
|
|
@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash;
|
|||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
||||
|
||||
int32_t vnodeInitModule() {
|
||||
static int tsOpennedVnodes;
|
||||
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||
|
||||
static void vnodeInit() {
|
||||
|
||||
vnodeInitWriteFp();
|
||||
|
||||
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
|
||||
if (tsDnodeVnodesHash == NULL) {
|
||||
dError("failed to init vnode list");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
typedef void (*CleanupFp)(char *);
|
||||
void vnodeCleanupModule() {
|
||||
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
|
||||
taosCleanUpIntHash(tsDnodeVnodesHash);
|
||||
}
|
||||
|
||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||
int32_t code;
|
||||
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||
|
||||
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
||||
|
||||
|
@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) {
|
|||
|
||||
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||
char temp[TSDB_FILENAME_LEN];
|
||||
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||
|
||||
SVnodeObj vnodeObj = {0};
|
||||
vnodeObj.vgId = vnode;
|
||||
|
@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
pVnode->status = VN_STATUS_READY;
|
||||
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||
|
||||
tsOpennedVnodes++;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeClose(void *param) {
|
||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||
int32_t vnodeClose(int32_t vgId) {
|
||||
|
||||
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
|
||||
if (pVnode == NULL) return 0;
|
||||
|
||||
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
|
||||
pVnode->status = VN_STATUS_CLOSING;
|
||||
|
@ -183,6 +182,12 @@ void vnodeRelease(void *pVnodeRaw) {
|
|||
}
|
||||
|
||||
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
||||
|
||||
tsOpennedVnodes--;
|
||||
if (tsOpennedVnodes <= 0) {
|
||||
taosCleanUpIntHash(tsDnodeVnodesHash);
|
||||
vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||
}
|
||||
}
|
||||
|
||||
void *vnodeGetVnode(int32_t vgId) {
|
||||
|
|
Loading…
Reference in New Issue