This commit is contained in:
Shengliang Guan 2022-03-13 00:06:09 +08:00
parent 7435abcf9b
commit 6c8cf998fd
9 changed files with 169 additions and 156 deletions

View File

@ -42,10 +42,17 @@ typedef struct SRpcMsg {
void * pCont;
int contLen;
int32_t code;
void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client
void *handle; // rpc handle returned to app
void *ahandle; // app handle set by client
} SRpcMsg;
typedef struct {
char user[TSDB_USER_LEN];
SRpcMsg rpcMsg;
SEpSet rpcEpSet;
int32_t rspLen;
void *pRsp;
} SNodeMsg;
typedef struct SRpcInit {
uint16_t localPort; // local port

View File

@ -61,29 +61,152 @@ typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANU } EEnvStat;
typedef struct SMgmtFp SMgmtFp;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg);
typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path);
typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pMgmt);
typedef bool (*MgmtRequiredFp)(SDnode *pDnode, const char *path);
typedef SArray *(*MgmtMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg);
typedef struct {
EWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
void *queueFp;
SDnode *pDnode;
STaosQueue *queue;
union {
SQWorkerPool pool;
SWWorkerPool mpool;
};
} SDnodeWorker;
typedef struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
int64_t dver;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SArray *pDnodeEps;
pthread_t *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
//
MndMsgFp msgFp[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SMnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
} SQnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SSnodeMgmt;
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
typedef struct {
SVnodesStat stat;
SHashObj *hash;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
} SVnodesMgmt;
typedef struct {
void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TDMT_MAX];
} STransMgmt;
typedef struct SMgmtFp {
MgmtOpenFp openFp;
MgmtCloseFp closeFp;
MgmtRequiredFp requiredFp;
MgmtMsgFp msgFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
const char *name;
char *path;
bool required;
EProcType procType;
SProcObj *pProc;
void *pMgmt;
SMgmtFp fp;
} SMgmtWrapper;
typedef struct SDnode {
EDndStatus status;
EDndEvent event;
EProcType procType;
SDndCfg cfg;
SDnodeDir dir;
SStartupReq startup;
TdFilePtr pLockFile;
SDnodeMgmt dmgmt;
SMndMgmt mmgmt;
SQnodeMgmt qmgmt;
SSnodeMgmt smgmt;
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
STfs *pTfs;
SStartupReq startup;
EDndEvent event;
SMgmtFp fps[NODE_MAX];
SMgmtWrapper mgmts[NODE_MAX];
} SDnode;
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
const char *dndStatStr(EDndStatus stat);

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_ENV_H_
#define _TD_DND_ENV_H_
#ifndef _TD_DND_MAIN_H_
#define _TD_DND_MAIN_H_
#ifdef __cplusplus
extern "C" {
@ -22,126 +22,8 @@ extern "C" {
#include "dndInt.h"
typedef struct {
EWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
void *queueFp;
SDnode *pDnode;
STaosQueue *queue;
union {
SQWorkerPool pool;
SWWorkerPool mpool;
};
} SDnodeWorker;
typedef struct {
char *dnode;
char *mnode;
char *snode;
char *bnode;
char *vnodes;
} SDnodeDir;
typedef struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
int64_t dver;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
SArray *pDnodeEps;
pthread_t *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt;
typedef enum { SINGLE_PROC, MULTI_PROC_PARENT, MULTI_PROC_CHILD } EProcType;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
//
MndMsgFp msgFp[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SMndMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
} SQnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SSnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct {
SVnodesStat stat;
SHashObj *hash;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
} SVnodesMgmt;
typedef struct {
void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TDMT_MAX];
} STransMgmt;
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_ENV_H_*/
#endif /*_TD_DND_MAIN_H_*/

View File

@ -78,5 +78,6 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
return tfsGetMonitorInfo(pDnode->pTfs, pInfo);
//return tfsGetMonitorInfo(pDnode->pTfs, pInfo);
return tfsGetMonitorInfo(NULL, pInfo);
}

View File

@ -48,7 +48,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
// mmWorker
int32_t mmStartWorker(SDnode *pDnode);
void mmStopWorker(SDnode *pDnode);
void mmInitMsgFp(SMndMgmt *pMgmt);
void mmInitMsgFp(SMnodeMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);

View File

@ -17,7 +17,7 @@
#include "mm.h"
int32_t mmReadFile(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
int32_t len = 0;
@ -115,7 +115,7 @@ PRASE_MNODE_OVER:
}
int32_t mmWriteFile(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP);

View File

@ -122,7 +122,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro
}
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {

View File

@ -29,7 +29,7 @@ int32_t mmInit(SDnode *pDnode) {
dInfo("mnode mgmt start to init");
int32_t code = -1;
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch);
mmInitMsgFp(pMgmt);
@ -76,7 +76,7 @@ _OVER:
void mmCleanup(SDnode *pDnode) {
dInfo("mnode mgmt start to clean up");
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
if (pMgmt->pMnode) {
mmStopWorker(pDnode);
mndClose(pMgmt->pMnode);
@ -86,7 +86,7 @@ void mmCleanup(SDnode *pDnode) {
}
SMnode *mmAcquire(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL;
int32_t refCount = 0;
@ -108,7 +108,7 @@ SMnode *mmAcquire(SDnode *pDnode) {
void mmRelease(SDnode *pDnode, SMnode *pMnode) {
if (pMnode == NULL) return;
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
@ -116,7 +116,7 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) {
}
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->singleProc = true;
int32_t code = mmOpenImp(pDnode, pOption);
@ -150,7 +150,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
}
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {
@ -169,7 +169,7 @@ int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
}
int32_t mmDrop(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) {
@ -238,7 +238,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
@ -246,7 +246,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) {
mmInitOption(pDnode, pOption);
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
@ -274,7 +274,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
return -1;
}
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
@ -282,7 +282,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
}
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {

View File

@ -28,7 +28,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg);
int32_t mmStartWorker(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr());
return -1;
@ -48,7 +48,7 @@ int32_t mmStartWorker(SDnode *pDnode) {
}
void mmStopWorker(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
@ -63,7 +63,7 @@ void mmStopWorker(SDnode *pDnode) {
dndCleanupWorker(&pMgmt->syncWorker);
}
void mmInitMsgFp(SMndMgmt *pMgmt) {
void mmInitMsgFp(SMnodeMgmt *pMgmt) {
// Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg;
@ -163,7 +163,7 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
}
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
SMndMsg *pMsg = NULL;
@ -261,7 +261,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
}
void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) {
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
if (pMgmt->singleProc) {
@ -278,7 +278,7 @@ void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) {
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg);
SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont;