Merge pull request #9699 from taosdata/feature/dnode3
set vgId in vnode objects
This commit is contained in:
commit
0fb5c69ed5
|
@ -805,19 +805,19 @@ typedef struct {
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
} SCreateVnodeMsg, SAlterVnodeMsg;
|
} SCreateVnodeReq, SAlterVnodeReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int8_t accessState;
|
int8_t accessState;
|
||||||
} SAuthVnodeMsg;
|
} SAuthVnodeReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
|
|
|
@ -32,6 +32,8 @@ extern "C" {
|
||||||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||||
typedef struct SVnode SVnode;
|
typedef struct SVnode SVnode;
|
||||||
typedef struct SVnodeCfg {
|
typedef struct SVnodeCfg {
|
||||||
|
int32_t vgId;
|
||||||
|
|
||||||
/** vnode buffer pool options */
|
/** vnode buffer pool options */
|
||||||
struct {
|
struct {
|
||||||
/** write buffer size */
|
/** write buffer size */
|
||||||
|
|
|
@ -277,9 +277,12 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452)
|
#define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452)
|
||||||
#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453)
|
#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453)
|
||||||
#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454)
|
#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454)
|
||||||
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460)
|
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0460)
|
||||||
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461)
|
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0461)
|
||||||
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462)
|
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0462)
|
||||||
|
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0463)
|
||||||
|
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0464)
|
||||||
|
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0465)
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
|
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
|
||||||
|
|
|
@ -29,12 +29,12 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
|
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
|
||||||
|
if (pBnode == NULL) return;
|
||||||
|
|
||||||
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
int32_t refCount = 0;
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pBnode != NULL) {
|
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
dTrace("release bnode, refCount:%d", refCount);
|
||||||
if (pBnode != NULL) {
|
|
||||||
dTrace("release bnode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndReadBnodeFile(SDnode *pDnode) {
|
static int32_t dndReadBnodeFile(SDnode *pDnode) {
|
||||||
|
|
|
@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
|
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
|
||||||
|
if (pMnode == NULL) return;
|
||||||
|
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
int32_t refCount = 0;
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pMnode != NULL) {
|
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
dTrace("release mnode, refCount:%d", refCount);
|
||||||
if (pMnode != NULL) {
|
|
||||||
dTrace("release mnode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||||
|
|
|
@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
|
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
|
||||||
|
if (pQnode == NULL) return;
|
||||||
|
|
||||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||||
int32_t refCount = 0;
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pQnode != NULL) {
|
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
dTrace("release qnode, refCount:%d", refCount);
|
||||||
if (pQnode != NULL) {
|
|
||||||
dTrace("release qnode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndReadQnodeFile(SDnode *pDnode) {
|
static int32_t dndReadQnodeFile(SDnode *pDnode) {
|
||||||
|
|
|
@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
|
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
|
||||||
|
if (pSnode == NULL) return;
|
||||||
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
int32_t refCount = 0;
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pSnode != NULL) {
|
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
||||||
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
dTrace("release snode, refCount:%d", refCount);
|
||||||
if (pSnode != NULL) {
|
|
||||||
dTrace("release snode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndReadSnodeFile(SDnode *pDnode) {
|
static int32_t dndReadSnodeFile(SDnode *pDnode) {
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct {
|
||||||
STaosQueue *pSyncQ;
|
STaosQueue *pSyncQ;
|
||||||
STaosQueue *pApplyQ;
|
STaosQueue *pApplyQ;
|
||||||
STaosQueue *pQueryQ;
|
STaosQueue *pQueryQ;
|
||||||
STaosQueue* pFetchQ;
|
STaosQueue *pFetchQ;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -53,22 +53,8 @@ typedef struct {
|
||||||
SWrapperCfg *pCfgs;
|
SWrapperCfg *pCfgs;
|
||||||
} SVnodeThread;
|
} SVnodeThread;
|
||||||
|
|
||||||
static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
|
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||||
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode);
|
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
||||||
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupVnodeReadWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupVnodeWriteWorker(SDnode *pDnode);
|
|
||||||
static void dndCleanupVnodeSyncWorker(SDnode *pDnode);
|
|
||||||
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
|
|
||||||
|
|
||||||
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||||
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
|
||||||
|
@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
if (pVnode == NULL) return;
|
if (pVnode == NULL) return;
|
||||||
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
|
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->vgId = pCfg->vgId;
|
pVnode->vgId = pCfg->vgId;
|
||||||
pVnode->refCount = 1;
|
pVnode->refCount = 0;
|
||||||
pVnode->dropped = 0;
|
pVnode->dropped = 0;
|
||||||
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
|
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
|
||||||
pVnode->pImpl = pImpl;
|
pVnode->pImpl = pImpl;
|
||||||
|
@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) {
|
if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
|
||||||
return -1;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocVnodeFetchQueue(pDnode, pVnode) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocVnodeWriteQueue(pDnode, pVnode) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocVnodeApplyQueue(pDnode, pVnode) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndAllocVnodeSyncQueue(pDnode, pVnode) != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||||
|
|
||||||
dndFreeVnodeQueryQueue(pDnode, pVnode);
|
dndFreeVnodeQueue(pDnode, pVnode);
|
||||||
dndFreeVnodeFetchQueue(pDnode, pVnode);
|
|
||||||
dndFreeVnodeWriteQueue(pDnode, pVnode);
|
|
||||||
dndFreeVnodeApplyQueue(pDnode, pVnode);
|
|
||||||
dndFreeVnodeSyncQueue(pDnode, pVnode);
|
|
||||||
|
|
||||||
vnodeClose(pVnode->pImpl);
|
vnodeClose(pVnode->pImpl);
|
||||||
pVnode->pImpl = NULL;
|
pVnode->pImpl = NULL;
|
||||||
|
|
||||||
|
@ -412,7 +376,10 @@ static void *dnodeOpenVnodeFunc(void *param) {
|
||||||
pMgmt->openVnodes, pMgmt->totalVnodes);
|
pMgmt->openVnodes, pMgmt->totalVnodes);
|
||||||
dndReportStartup(pDnode, "open-vnodes", stepDesc);
|
dndReportStartup(pDnode, "open-vnodes", stepDesc);
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(pCfg->path, NULL);
|
SVnodeCfg vnodeCfg = {0};
|
||||||
|
vnodeCfg.vgId = pCfg->vgId;
|
||||||
|
|
||||||
|
SVnode *pImpl = vnodeOpen(pCfg->path, &vnodeCfg);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
|
@ -508,7 +475,6 @@ static void dndCloseVnodes(SDnode *pDnode) {
|
||||||
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
|
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
dndReleaseVnode(pDnode, pVnodes[i]);
|
|
||||||
dndCloseVnode(pDnode, pVnodes[i]);
|
dndCloseVnode(pDnode, pVnodes[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,8 +490,8 @@ static void dndCloseVnodes(SDnode *pDnode) {
|
||||||
dInfo("total vnodes:%d are all closed", numOfVnodes);
|
dInfo("total vnodes:%d are all closed", numOfVnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) {
|
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
|
||||||
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SCreateVnodeReq *pCreate = pReq->pCont;
|
||||||
pCreate->vgId = htonl(pCreate->vgId);
|
pCreate->vgId = htonl(pCreate->vgId);
|
||||||
pCreate->dnodeId = htonl(pCreate->dnodeId);
|
pCreate->dnodeId = htonl(pCreate->dnodeId);
|
||||||
pCreate->dbUid = htobe64(pCreate->dbUid);
|
pCreate->dbUid = htobe64(pCreate->dbUid);
|
||||||
|
@ -549,7 +515,8 @@ static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) {
|
||||||
return pCreate;
|
return pCreate;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
|
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
|
pCfg->vgId = pCreate->vgId;
|
||||||
pCfg->wsize = pCreate->cacheBlockSize;
|
pCfg->wsize = pCreate->cacheBlockSize;
|
||||||
pCfg->ssize = pCreate->cacheBlockSize;
|
pCfg->ssize = pCreate->cacheBlockSize;
|
||||||
pCfg->wsize = pCreate->cacheBlockSize;
|
pCfg->wsize = pCreate->cacheBlockSize;
|
||||||
|
@ -572,7 +539,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->walCfg.vgId = pCreate->vgId;
|
pCfg->walCfg.vgId = pCreate->vgId;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) {
|
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
|
||||||
memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
|
memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
|
||||||
pCfg->dbUid = pCreate->dbUid;
|
pCfg->dbUid = pCreate->dbUid;
|
||||||
pCfg->dropped = 0;
|
pCfg->dropped = 0;
|
||||||
|
@ -581,20 +548,20 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWra
|
||||||
pCfg->vgVersion = pCreate->vgVersion;
|
pCfg->vgVersion = pCreate->vgVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
|
static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) {
|
||||||
SDropVnodeMsg *pDrop = rpcMsg->pCont;
|
SDropVnodeReq *pDrop = pReq->pCont;
|
||||||
pDrop->vgId = htonl(pDrop->vgId);
|
pDrop->vgId = htonl(pDrop->vgId);
|
||||||
return pDrop;
|
return pDrop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
|
static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) {
|
||||||
SAuthVnodeMsg *pAuth = rpcMsg->pCont;
|
SAuthVnodeReq *pAuth = pReq->pCont;
|
||||||
pAuth->vgId = htonl(pAuth->vgId);
|
pAuth->vgId = htonl(pAuth->vgId);
|
||||||
return pAuth;
|
return pAuth;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg);
|
SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
|
||||||
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
|
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
|
||||||
|
|
||||||
SVnodeCfg vnodeCfg = {0};
|
SVnodeCfg vnodeCfg = {0};
|
||||||
|
@ -607,16 +574,19 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
dDebug("vgId:%d, already exist, return success", pCreate->vgId);
|
dDebug("vgId:%d, already exist, return success", pCreate->vgId);
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
|
terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/);
|
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
|
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
|
int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
|
||||||
vnodeClose(pImpl);
|
vnodeClose(pImpl);
|
||||||
vnodeDestroy(wrapperCfg.path);
|
vnodeDestroy(wrapperCfg.path);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
@ -634,23 +604,20 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg);
|
SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
|
||||||
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
|
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
|
||||||
|
|
||||||
SVnodeCfg vnodeCfg = {0};
|
SVnodeCfg vnodeCfg = {0};
|
||||||
dndGenerateVnodeCfg(pAlter, &vnodeCfg);
|
dndGenerateVnodeCfg(pAlter, &vnodeCfg);
|
||||||
|
|
||||||
SWrapperCfg wrapperCfg = {0};
|
|
||||||
dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg);
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
|
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wrapperCfg.vgVersion == pVnode->vgVersion) {
|
if (pAlter->vgVersion == pVnode->vgVersion) {
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
|
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -659,11 +626,11 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
|
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
|
||||||
dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
|
dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t oldVersion = pVnode->vgVersion;
|
int32_t oldVersion = pVnode->vgVersion;
|
||||||
pVnode->vgVersion = wrapperCfg.vgVersion;
|
pVnode->vgVersion = pAlter->vgVersion;
|
||||||
int32_t code = dndWriteVnodesToFile(pDnode);
|
int32_t code = dndWriteVnodesToFile(pDnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pVnode->vgVersion = oldVersion;
|
pVnode->vgVersion = oldVersion;
|
||||||
|
@ -673,8 +640,8 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
|
SDropVnodeReq *pDrop = vnodeParseDropVnodeReq(pReq);
|
||||||
|
|
||||||
int32_t vgId = pDrop->vgId;
|
int32_t vgId = pDrop->vgId;
|
||||||
dDebug("vgId:%d, drop vnode req is received", vgId);
|
dDebug("vgId:%d, drop vnode req is received", vgId);
|
||||||
|
@ -688,10 +655,10 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
pVnode->dropped = 1;
|
pVnode->dropped = 1;
|
||||||
if (dndWriteVnodesToFile(pDnode) != 0) {
|
if (dndWriteVnodesToFile(pDnode) != 0) {
|
||||||
pVnode->dropped = 0;
|
pVnode->dropped = 0;
|
||||||
return terrno;
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
|
||||||
dndCloseVnode(pDnode, pVnode);
|
dndCloseVnode(pDnode, pVnode);
|
||||||
vnodeClose(pVnode->pImpl);
|
vnodeClose(pVnode->pImpl);
|
||||||
vnodeDestroy(pVnode->path);
|
vnodeDestroy(pVnode->path);
|
||||||
|
@ -700,17 +667,16 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
|
SAuthVnodeReq *pAuth = (SAuthVnodeReq *)vnodeParseAuthVnodeReq(pReq);
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t vgId = pAuth->vgId;
|
int32_t vgId = pAuth->vgId;
|
||||||
dDebug("vgId:%d, auth vnode req is received", vgId);
|
dDebug("vgId:%d, auth vnode req is received", vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
|
dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->accessState = pAuth->accessState;
|
pVnode->accessState = pAuth->accessState;
|
||||||
|
@ -718,30 +684,30 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
|
SSyncVnodeReq *pSync = (SSyncVnodeReq *)vnodeParseDropVnodeReq(pReq);
|
||||||
|
|
||||||
int32_t vgId = pAuth->vgId;
|
int32_t vgId = pSync->vgId;
|
||||||
dDebug("vgId:%d, auth vnode req is received", vgId);
|
dDebug("vgId:%d, sync vnode req is received", vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
|
dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vnodeSync(pVnode->pImpl) != 0) {
|
if (vnodeSync(pVnode->pImpl) != 0) {
|
||||||
dError("vgId:%d, failed to auth vnode since %s", vgId, terrstr());
|
dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
|
SCompactVnodeReq *pCompact = (SCompactVnodeReq *)vnodeParseDropVnodeReq(pReq);
|
||||||
|
|
||||||
int32_t vgId = pCompact->vgId;
|
int32_t vgId = pCompact->vgId;
|
||||||
dDebug("vgId:%d, compact vnode req is received", vgId);
|
dDebug("vgId:%d, compact vnode req is received", vgId);
|
||||||
|
@ -749,13 +715,13 @@ int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
|
dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vnodeCompact(pVnode->pImpl) != 0) {
|
if (vnodeCompact(pVnode->pImpl) != 0) {
|
||||||
dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
|
dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return terrno;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
|
@ -814,6 +780,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
|
||||||
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
|
||||||
}
|
}
|
||||||
|
@ -825,6 +792,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
|
||||||
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
|
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
|
||||||
}
|
}
|
||||||
|
@ -848,21 +816,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
|
if (pRpcMsg->msgType & 1u) {
|
||||||
rpcSendResponse(&rsp);
|
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pRpcMsg->pCont);
|
rpcFreeCont(pRpcMsg->pCont);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
|
if (pMsg->msgType & 1u) {
|
||||||
rpcSendResponse(&rsp);
|
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -903,193 +875,96 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
|
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
|
||||||
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) return -1;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||||
dndReleaseVnode(pDnode, pVnode);
|
dndReleaseVnode(pDnode, pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
|
|
||||||
if (pVnode->pQueryQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
|
||||||
pVnode->pQueryQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
|
|
||||||
if (pVnode->pFetchQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
|
||||||
pVnode->pFetchQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
|
|
||||||
int32_t maxFetchThreads = 4;
|
int32_t maxFetchThreads = 4;
|
||||||
float threadsForQuery = MAX(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores, 1);
|
int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores);
|
||||||
|
int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1);
|
||||||
|
int32_t maxQueryThreads = minQueryThreads;
|
||||||
|
int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1);
|
||||||
|
int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1);
|
||||||
|
|
||||||
SWorkerPool *pPool = &pMgmt->queryPool;
|
SWorkerPool *pPool = &pMgmt->queryPool;
|
||||||
pPool->name = "vnode-query";
|
pPool->name = "vnode-query";
|
||||||
pPool->min = (int32_t)threadsForQuery;
|
pPool->min = minQueryThreads;
|
||||||
pPool->max = pPool->min;
|
pPool->max = maxQueryThreads;
|
||||||
if (tWorkerInit(pPool) != 0) {
|
if (tWorkerInit(pPool) != 0) return -1;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pPool = &pMgmt->fetchPool;
|
pPool = &pMgmt->fetchPool;
|
||||||
pPool->name = "vnode-fetch";
|
pPool->name = "vnode-fetch";
|
||||||
pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores);
|
pPool->min = minFetchThreads;
|
||||||
pPool->max = pPool->min;
|
pPool->max = maxFetchThreads;
|
||||||
if (tWorkerInit(pPool) != 0) {
|
if (tWorkerInit(pPool) != 0) return -1;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("vnode read worker is initialized");
|
SMWorkerPool *pMPool = &pMgmt->writePool;
|
||||||
|
pMPool->name = "vnode-write";
|
||||||
|
pMPool->max = maxWriteThreads;
|
||||||
|
if (tMWorkerInit(pMPool) != 0) return -1;
|
||||||
|
|
||||||
|
pMPool = &pMgmt->syncPool;
|
||||||
|
pMPool->name = "vnode-sync";
|
||||||
|
pMPool->max = maxSyncThreads;
|
||||||
|
if (tMWorkerInit(pMPool) != 0) return -1;
|
||||||
|
|
||||||
|
dDebug("vnode workers is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndCleanupVnodeReadWorker(SDnode *pDnode) {
|
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
tWorkerCleanup(&pMgmt->fetchPool);
|
tWorkerCleanup(&pMgmt->fetchPool);
|
||||||
tWorkerCleanup(&pMgmt->queryPool);
|
tWorkerCleanup(&pMgmt->queryPool);
|
||||||
dDebug("vnode close worker is initialized");
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
|
|
||||||
if (pVnode->pWriteQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
|
||||||
pVnode->pWriteQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
|
|
||||||
if (pVnode->pApplyQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
|
||||||
pVnode->pApplyQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
SMWorkerPool *pPool = &pMgmt->writePool;
|
|
||||||
pPool->name = "vnode-write";
|
|
||||||
pPool->max = pDnode->opt.numOfCores;
|
|
||||||
if (tMWorkerInit(pPool) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("vnode write worker is initialized");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tMWorkerCleanup(&pMgmt->writePool);
|
tMWorkerCleanup(&pMgmt->writePool);
|
||||||
dDebug("vnode write worker is closed");
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
|
||||||
if (pVnode->pSyncQ == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
|
||||||
pVnode->pSyncQ = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
|
|
||||||
int32_t maxThreads = pDnode->opt.numOfCores / 2;
|
|
||||||
if (maxThreads < 1) maxThreads = 1;
|
|
||||||
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
SMWorkerPool *pPool = &pMgmt->syncPool;
|
|
||||||
pPool->name = "vnode-sync";
|
|
||||||
pPool->max = maxThreads;
|
|
||||||
if (tMWorkerInit(pPool) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dDebug("vnode sync worker is initialized");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndCleanupVnodeSyncWorker(SDnode *pDnode) {
|
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
|
||||||
tMWorkerCleanup(&pMgmt->syncPool);
|
tMWorkerCleanup(&pMgmt->syncPool);
|
||||||
dDebug("vnode sync worker is closed");
|
dDebug("vnode workers is closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
|
|
||||||
|
pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
|
||||||
|
pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
|
||||||
|
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
|
||||||
|
pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
|
||||||
|
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
|
||||||
|
|
||||||
|
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
|
||||||
|
pVnode->pQueryQ == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
|
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
|
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
|
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||||
|
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
||||||
|
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||||
|
pVnode->pWriteQ = NULL;
|
||||||
|
pVnode->pApplyQ = NULL;
|
||||||
|
pVnode->pSyncQ = NULL;
|
||||||
|
pVnode->pFetchQ = NULL;
|
||||||
|
pVnode->pQueryQ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndInitVnodes(SDnode *pDnode) {
|
int32_t dndInitVnodes(SDnode *pDnode) {
|
||||||
dInfo("dnode-vnodes start to init");
|
dInfo("dnode-vnodes start to init");
|
||||||
|
|
||||||
if (dndInitVnodeReadWorker(pDnode) != 0) {
|
if (dndInitVnodeWorkers(pDnode) != 0) {
|
||||||
dError("failed to init vnodes read worker since %s", terrstr());
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
dError("failed to init vnode workers since %s", terrstr());
|
||||||
}
|
|
||||||
|
|
||||||
if (dndInitVnodeWriteWorker(pDnode) != 0) {
|
|
||||||
dError("failed to init vnodes write worker since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndInitVnodeSyncWorker(pDnode) != 0) {
|
|
||||||
dError("failed to init vnodes sync worker since %s", terrstr());
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1105,9 +980,7 @@ int32_t dndInitVnodes(SDnode *pDnode) {
|
||||||
void dndCleanupVnodes(SDnode *pDnode) {
|
void dndCleanupVnodes(SDnode *pDnode) {
|
||||||
dInfo("dnode-vnodes start to clean up");
|
dInfo("dnode-vnodes start to clean up");
|
||||||
dndCloseVnodes(pDnode);
|
dndCloseVnodes(pDnode);
|
||||||
dndCleanupVnodeReadWorker(pDnode);
|
dndCleanupVnodeWorkers(pDnode);
|
||||||
dndCleanupVnodeWriteWorker(pDnode);
|
|
||||||
dndCleanupVnodeSyncWorker(pDnode);
|
|
||||||
dInfo("dnode-vnodes is cleaned up");
|
dInfo("dnode-vnodes is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,8 +4,5 @@ add_subdirectory(qnode)
|
||||||
add_subdirectory(bnode)
|
add_subdirectory(bnode)
|
||||||
add_subdirectory(snode)
|
add_subdirectory(snode)
|
||||||
add_subdirectory(mnode)
|
add_subdirectory(mnode)
|
||||||
add_subdirectory(db)
|
add_subdirectory(vnode)
|
||||||
add_subdirectory(stb)
|
|
||||||
add_subdirectory(vgroup)
|
|
||||||
|
|
||||||
add_subdirectory(sut)
|
add_subdirectory(sut)
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
aux_source_directory(. DB_SRC)
|
|
||||||
add_executable(dnode_test_db ${DB_SRC})
|
|
||||||
target_link_libraries(
|
|
||||||
dnode_test_db
|
|
||||||
PUBLIC sut
|
|
||||||
)
|
|
||||||
|
|
||||||
add_test(
|
|
||||||
NAME dnode_test_db
|
|
||||||
COMMAND dnode_test_db
|
|
||||||
)
|
|
|
@ -1,11 +0,0 @@
|
||||||
aux_source_directory(. VGROUP_SRC)
|
|
||||||
add_executable(dnode_test_vgroup ${VGROUP_SRC})
|
|
||||||
target_link_libraries(
|
|
||||||
dnode_test_vgroup
|
|
||||||
PUBLIC sut
|
|
||||||
)
|
|
||||||
|
|
||||||
add_test(
|
|
||||||
NAME dnode_test_vgroup
|
|
||||||
COMMAND dnode_test_vgroup
|
|
||||||
)
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. VNODE_SRC)
|
||||||
|
add_executable(dnode_test_vnode ${VNODE_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
dnode_test_vnode
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME dnode_test_vnode
|
||||||
|
COMMAND dnode_test_vnode
|
||||||
|
)
|
|
@ -11,9 +11,9 @@
|
||||||
|
|
||||||
#include "sut.h"
|
#include "sut.h"
|
||||||
|
|
||||||
class DndTestVgroup : public ::testing::Test {
|
class DndTestVnode : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vgroup", 9150); }
|
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9150); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
@ -23,14 +23,14 @@ class DndTestVgroup : public ::testing::Test {
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase DndTestVgroup::test;
|
Testbase DndTestVnode::test;
|
||||||
|
|
||||||
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
|
||||||
{
|
{
|
||||||
for (int i = 0; i < 3; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
int32_t contLen = sizeof(SCreateVnodeMsg);
|
int32_t contLen = sizeof(SCreateVnodeReq);
|
||||||
|
|
||||||
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(contLen);
|
SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen);
|
||||||
pReq->vgId = htonl(2);
|
pReq->vgId = htonl(2);
|
||||||
pReq->dnodeId = htonl(1);
|
pReq->dnodeId = htonl(1);
|
||||||
strcpy(pReq->db, "1.d1");
|
strcpy(pReq->db, "1.d1");
|
||||||
|
@ -68,9 +68,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
||||||
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < 3; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
int32_t contLen = sizeof(SAlterVnodeMsg);
|
int32_t contLen = sizeof(SAlterVnodeReq);
|
||||||
|
|
||||||
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(contLen);
|
SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen);
|
||||||
pReq->vgId = htonl(2);
|
pReq->vgId = htonl(2);
|
||||||
pReq->dnodeId = htonl(1);
|
pReq->dnodeId = htonl(1);
|
||||||
strcpy(pReq->db, "1.d1");
|
strcpy(pReq->db, "1.d1");
|
||||||
|
@ -108,9 +108,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
||||||
|
|
||||||
{
|
{
|
||||||
for (int i = 0; i < 3; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
int32_t contLen = sizeof(SDropVnodeMsg);
|
int32_t contLen = sizeof(SDropVnodeReq);
|
||||||
|
|
||||||
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(contLen);
|
SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen);
|
||||||
pReq->vgId = htonl(2);
|
pReq->vgId = htonl(2);
|
||||||
pReq->dnodeId = htonl(1);
|
pReq->dnodeId = htonl(1);
|
||||||
strcpy(pReq->db, "1.d1");
|
strcpy(pReq->db, "1.d1");
|
||||||
|
@ -118,7 +118,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = sizeof(SDropVnodeMsg);
|
rpcMsg.contLen = sizeof(SDropVnodeReq);
|
||||||
rpcMsg.msgType = TDMT_DND_DROP_VNODE;
|
rpcMsg.msgType = TDMT_DND_DROP_VNODE;
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
|
|
@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
||||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
|
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
|
||||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||||
|
|
||||||
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,11 +331,11 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pMsg;
|
action.pCont = pMsg;
|
||||||
action.contLen = sizeof(SCreateVnodeMsg);
|
action.contLen = sizeof(SCreateVnodeReq);
|
||||||
action.msgType = TDMT_DND_CREATE_VNODE;
|
action.msgType = TDMT_DND_CREATE_VNODE;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
|
@ -360,11 +360,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pMsg;
|
action.pCont = pMsg;
|
||||||
action.contLen = sizeof(SDropVnodeMsg);
|
action.contLen = sizeof(SDropVnodeReq);
|
||||||
action.msgType = TDMT_DND_DROP_VNODE;
|
action.msgType = TDMT_DND_DROP_VNODE;
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
|
@ -593,11 +593,11 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pMsg;
|
action.pCont = pMsg;
|
||||||
action.contLen = sizeof(SAlterVnodeMsg);
|
action.contLen = sizeof(SAlterVnodeReq);
|
||||||
action.msgType = TDMT_DND_ALTER_VNODE;
|
action.msgType = TDMT_DND_ALTER_VNODE;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
|
@ -757,11 +757,11 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||||
if (pMsg == NULL) return -1;
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pMsg;
|
action.pCont = pMsg;
|
||||||
action.contLen = sizeof(SCreateVnodeMsg);
|
action.contLen = sizeof(SCreateVnodeReq);
|
||||||
action.msgType = TDMT_DND_DROP_VNODE;
|
action.msgType = TDMT_DND_DROP_VNODE;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
|
|
|
@ -189,8 +189,8 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg));
|
SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq));
|
||||||
if (pCreate == NULL) {
|
if (pCreate == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -248,8 +248,8 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
|
||||||
return pCreate;
|
return pCreate;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg));
|
SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq));
|
||||||
if (pDrop == NULL) {
|
if (pDrop == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -10,3 +10,5 @@ add_subdirectory(show)
|
||||||
add_subdirectory(profile)
|
add_subdirectory(profile)
|
||||||
add_subdirectory(dnode)
|
add_subdirectory(dnode)
|
||||||
add_subdirectory(mnode)
|
add_subdirectory(mnode)
|
||||||
|
add_subdirectory(db)
|
||||||
|
add_subdirectory(stb)
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. DB_SRC)
|
||||||
|
add_executable(mnode_test_db ${DB_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
mnode_test_db
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME mnode_test_db
|
||||||
|
COMMAND mnode_test_db
|
||||||
|
)
|
|
@ -1,19 +1,19 @@
|
||||||
/**
|
/**
|
||||||
* @file db.cpp
|
* @file db.cpp
|
||||||
* @author slguan (slguan@taosdata.com)
|
* @author slguan (slguan@taosdata.com)
|
||||||
* @brief DNODE module db-msg tests
|
* @brief MNODE module db tests
|
||||||
* @version 0.1
|
* @version 1.0
|
||||||
* @date 2021-12-15
|
* @date 2022-01-11
|
||||||
*
|
*
|
||||||
* @copyright Copyright (c) 2021
|
* @copyright Copyright (c) 2022
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sut.h"
|
#include "sut.h"
|
||||||
|
|
||||||
class DndTestDb : public ::testing::Test {
|
class MndTestDb : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_db", 9040); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
@ -23,9 +23,9 @@ class DndTestDb : public ::testing::Test {
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase DndTestDb::test;
|
Testbase MndTestDb::test;
|
||||||
|
|
||||||
TEST_F(DndTestDb, 01_ShowDb) {
|
TEST_F(MndTestDb, 01_ShowDb) {
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
|
||||||
CHECK_META("show databases", 18);
|
CHECK_META("show databases", 18);
|
||||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
|
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
|
||||||
|
@ -51,7 +51,7 @@ TEST_F(DndTestDb, 01_ShowDb) {
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SCreateDbMsg);
|
int32_t contLen = sizeof(SCreateDbMsg);
|
||||||
|
|
||||||
|
@ -211,7 +211,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
|
TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SCreateDbMsg);
|
int32_t contLen = sizeof(SCreateDbMsg);
|
||||||
|
|
||||||
|
@ -281,7 +281,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
|
||||||
EXPECT_EQ(pInfo->numOfEps, 1);
|
EXPECT_EQ(pInfo->numOfEps, 1);
|
||||||
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
|
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
|
||||||
pAddr->port = htons(pAddr->port);
|
pAddr->port = htons(pAddr->port);
|
||||||
EXPECT_EQ(pAddr->port, 9040);
|
EXPECT_EQ(pAddr->port, 9030);
|
||||||
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +297,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
|
||||||
EXPECT_EQ(pInfo->numOfEps, 1);
|
EXPECT_EQ(pInfo->numOfEps, 1);
|
||||||
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
|
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
|
||||||
pAddr->port = htons(pAddr->port);
|
pAddr->port = htons(pAddr->port);
|
||||||
EXPECT_EQ(pAddr->port, 9040);
|
EXPECT_EQ(pAddr->port, 9030);
|
||||||
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
EXPECT_STREQ(pAddr->fqdn, "localhost");
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,19 +1,19 @@
|
||||||
/**
|
/**
|
||||||
* @file stb.cpp
|
* @file stb.cpp
|
||||||
* @author slguan (slguan@taosdata.com)
|
* @author slguan (slguan@taosdata.com)
|
||||||
* @brief DNODE module db-msg tests
|
* @brief MNODE module stb tests
|
||||||
* @version 0.1
|
* @version 1.0
|
||||||
* @date 2021-12-17
|
* @date 2022-01-12
|
||||||
*
|
*
|
||||||
* @copyright Copyright (c) 2021
|
* @copyright Copyright (c) 2022
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sut.h"
|
#include "sut.h"
|
||||||
|
|
||||||
class DndTestStb : public ::testing::Test {
|
class MndTestStb : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9101); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_stb", 9034); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
|
@ -23,9 +23,9 @@ class DndTestStb : public ::testing::Test {
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase DndTestStb::test;
|
Testbase MndTestStb::test;
|
||||||
|
|
||||||
TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SCreateDbMsg);
|
int32_t contLen = sizeof(SCreateDbMsg);
|
||||||
|
|
|
@ -24,9 +24,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
|
|
||||||
// Set default options
|
// Set default options
|
||||||
if (pVnodeCfg == NULL) {
|
//if (pVnodeCfg == NULL) {
|
||||||
pVnodeCfg = &defaultVnodeOptions;
|
pVnodeCfg = &defaultVnodeOptions;
|
||||||
}
|
//}
|
||||||
|
|
||||||
// Validate options
|
// Validate options
|
||||||
if (vnodeValidateOptions(pVnodeCfg) < 0) {
|
if (vnodeValidateOptions(pVnodeCfg) < 0) {
|
||||||
|
|
|
@ -277,9 +277,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_NOT_DEPLOYED, "Bnode not deployed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED, "Vnode already deployed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_NOT_DEPLOYED, "Vnode not deployed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_INVALID_OPTION, "Vnode option invalid")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnodes")
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")
|
||||||
|
|
Loading…
Reference in New Issue