TD-11265 refact dndVnodes

This commit is contained in:
Shengliang Guan 2021-11-26 14:43:56 +08:00
parent 0d57a4beb7
commit 7906e78257
8 changed files with 43 additions and 50 deletions

View File

@ -78,7 +78,7 @@ typedef struct {
* @brief data file's directory.
*
*/
char dataDir[PATH_MAX];
char dataDir[TSDB_FILENAME_LEN];
/**
* @brief local endpoint.

View File

@ -184,27 +184,9 @@ typedef struct {
SRpcMsg rpcMsg[];
} SVnodeMsg;
typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
typedef struct {
PutMsgToVnodeQFp putMsgToApplyQueueFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
} SVnodePara;
int32_t vnodeInit(SVnodePara);
void vnodeCleanup();
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
void vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);

View File

@ -141,7 +141,7 @@ void dmnInitOption(SDnodeOpt *pOption) {
pOption->shellActivityTimer = tsShellActivityTimer;
pOption->statusInterval = tsStatusInterval;
pOption->serverPort = tsServerPort;
tstrncpy(pOption->dataDir, tsDataDir, PATH_MAX);
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN);

View File

@ -455,12 +455,11 @@ static void *dnodeThreadRoutine(void *param) {
while (true) {
taosMsleep(ms);
if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
continue;
}
pthread_testcancel();
dndSendStatusMsg(pDnode);
if (dndGetStat(pDnode) == DND_STAT_RUNNING) {
dndSendStatusMsg(pDnode);
}
}
}
@ -501,7 +500,7 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1;
}
dInfo("dnd-dnode is initialized");
dInfo("dnode-dnode is initialized");
return 0;
}
@ -531,7 +530,7 @@ void dndCleanupDnode(SDnode *pDnode) {
}
dndWUnLockDnode(pDnode);
dInfo("dnd-dnode is cleaned up");
dInfo("dnode-dnode is cleaned up");
}
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {

View File

@ -616,7 +616,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -652,7 +652,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -703,7 +703,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
pPool->min = 0;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

View File

@ -22,6 +22,7 @@ typedef struct {
int32_t refCount;
int8_t dropped;
int8_t accessState;
char *path;
SVnode *pImpl;
taos_queue pWriteQ;
taos_queue pSyncQ;
@ -74,7 +75,7 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl);
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl);
static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes);
@ -125,7 +126,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
}
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl) {
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
@ -139,6 +140,12 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
pVnode->path = tstrdup(path);
if (pVnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) {
return -1;
}
@ -354,22 +361,25 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId);
SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
// SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
SVnode *pImpl = vnodeOpen(path, NULL);
if (pImpl == NULL) {
return -1;
}
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, pImpl);
int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
vnodeDrop(pImpl);
vnodeClose(pImpl);
vnodeDestroy(path);
terrno = code;
return code;
}
@ -385,7 +395,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
}
dndDropVnodeWrapper(pDnode, pVnode);
vnodeDrop(pVnode->pImpl);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;
}
@ -413,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dndCreateVnodeWrapper(pDnode, pVnode->vgId, pImpl);
dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
}
@ -433,7 +444,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->hash == NULL) {
dError("failed to init vnode hash");
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -874,13 +885,13 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -918,6 +929,7 @@ static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
@ -938,7 +950,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPool = &pMgmt->fetchPool;
@ -946,7 +959,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores);
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
@ -998,7 +1012,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -1036,7 +1050,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

View File

@ -116,6 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
return -1;
}
memcpy(&pDnode->opt, pOptions, sizeof(SDnodeOpt));
return 0;
}

View File

@ -17,9 +17,6 @@
#include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
void vnodeDrop(SVnode *pVnode) {}
@ -31,7 +28,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} else {
pMsg->allocNum = msgNum;
@ -41,7 +38,7 @@ SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;