Merge pull request #9656 from taosdata/feature/dnode3

add batch for create_table
This commit is contained in:
Shengliang Guan 2022-01-06 20:04:20 +08:00 committed by GitHub
commit 61136183a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 824 additions and 707 deletions

View File

@ -302,7 +302,7 @@ typedef struct {
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
int64_t startTime; int64_t startTime;
} SConnectMsg; } SConnectReq;
typedef struct SEpSet { typedef struct SEpSet {
int8_t inUse; int8_t inUse;
@ -650,6 +650,7 @@ typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int64_t dver;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int32_t numOfCores; int32_t numOfCores;
@ -657,7 +658,7 @@ typedef struct {
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads; SVnodeLoads vnodeLoads;
} SStatusMsg; } SStatusReq;
typedef struct { typedef struct {
int32_t reserved; int32_t reserved;
@ -682,6 +683,7 @@ typedef struct {
} SDnodeEps; } SDnodeEps;
typedef struct { typedef struct {
int64_t dver;
SDnodeCfg dnodeCfg; SDnodeCfg dnodeCfg;
SDnodeEps dnodeEps; SDnodeEps dnodeEps;
} SStatusRsp; } SStatusRsp;
@ -833,16 +835,16 @@ typedef struct SShowRsp {
typedef struct { typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port; int32_t port;
} SCreateDnodeMsg; } SCreateDnodeReq;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
} SDropDnodeMsg; } SDropDnodeReq;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char config[TSDB_DNODE_CONFIG_LEN]; char config[TSDB_DNODE_CONFIG_LEN];
} SCfgDnodeMsg; } SMCfgDnodeReq, SDCfgDnodeReq;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
@ -898,7 +900,7 @@ typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
char pData[]; char pData[];
} SHeartBeatMsg; } SHeartBeatReq;
typedef struct { typedef struct {
int32_t connId; int32_t connId;
@ -911,19 +913,14 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct {
int32_t connId;
int32_t streamId;
} SKillStreamMsg;
typedef struct { typedef struct {
int32_t connId; int32_t connId;
int32_t queryId; int32_t queryId;
} SKillQueryMsg; } SKillQueryReq;
typedef struct { typedef struct {
int32_t connId; int32_t connId;
} SKillConnMsg; } SKillConnReq;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];

View File

@ -281,6 +281,15 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
*/ */
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
/**
* @brief Get the version of the table
*
* @param pSdb The sdb object.
* @param pIter The type of the table.
* @return int32_t The version of the table
*/
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
/** /**
* @brief Update the version of sdb * @brief Update the version of sdb
* *

View File

@ -395,13 +395,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
} }
pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg); pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
if (pConnect == NULL) { if (pConnect == NULL) {
tfree(pMsgSendInfo); tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;

View File

@ -83,6 +83,7 @@ typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
int64_t clusterId; int64_t clusterId;
int64_t dver;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int8_t statusSent; int8_t statusSent;
@ -92,8 +93,7 @@ typedef struct {
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
STaosQueue *pMgmtQ; SDnodeWorker mgmtWorker;
SWorkerPool mgmtPool;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndBnode.h" #include "dndBnode.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"

View File

@ -14,28 +14,25 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndBnode.h" #include "dndBnode.h"
#include "dndMnode.h" #include "dndMnode.h"
#include "dndQnode.h" #include "dndQnode.h"
#include "dndSnode.h" #include "dndSnode.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#include "dndWorker.h"
static int32_t dndInitMgmtWorker(SDnode *pDnode);
static void dndCleanupMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMgmtQueue(SDnode *pDnode);
static void dndFreeMgmtQueue(SDnode *pDnode);
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndReadDnodes(SDnode *pDnode); static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode); static int32_t dndWriteDnodes(SDnode *pDnode);
static void *dnodeThreadRoutine(void *param); static void *dnodeThreadRoutine(void *param);
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
int32_t dndGetDnodeId(SDnode *pDnode) { int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
@ -80,13 +77,13 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0}; SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet); dndGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
@ -96,7 +93,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
epSet.port[i] = htons(epSet.port[i]); epSet.port[i] = htons(epSet.port[i]);
} }
rpcSendRedirectRsp(pMsg->handle, &epSet); rpcSendRedirectRsp(pReq->handle, &epSet);
} }
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
@ -350,14 +347,14 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
terrno = 0; terrno = 0;
pMgmt->updateTime = taosGetTimestampMs(); pMgmt->updateTime = taosGetTimestampMs();
dInfo("successed to write %s", pMgmt->file); dDebug("successed to write %s", pMgmt->file);
return 0; return 0;
} }
void dndSendStatusReq(SDnode *pDnode) { void dndSendStatusReq(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen); SStatusReq *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) { if (pStatus == NULL) {
dError("failed to malloc status message"); dError("failed to malloc status message");
return; return;
@ -366,6 +363,7 @@ void dndSendStatusReq(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver); pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dver = htobe64(pMgmt->dver);
pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->rebootTime = htobe64(pMgmt->rebootTime);
@ -385,12 +383,12 @@ void dndSendStatusReq(SDnode *pDnode) {
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status msg to mnode", pDnode); dTrace("pDnode:%p, send status req to mnode", pDnode);
dndSendReqToMnode(pDnode, &rpcMsg); dndSendReqToMnode(pDnode, &rpcMsg);
} }
@ -426,12 +424,12 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pRsp->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1; pMgmt->dropped = 1;
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
@ -439,14 +437,16 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
return; return;
} }
SStatusRsp *pRsp = pMsg->pCont; if (pRsp->pCont != NULL && pRsp->contLen != 0) {
if (pMsg->pCont != NULL && pMsg->contLen != 0) { SStatusRsp *pStatus = pRsp->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg; pMgmt->dver = htobe64(pStatus->dver);
SDnodeCfg *pCfg = &pStatus->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
SDnodeEps *pDnodeEps = &pRsp->dnodeEps; SDnodeEps *pDnodeEps = &pStatus->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num); pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) { for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
@ -458,26 +458,27 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) {
dError("grant rsp is received, but not supported yet");
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { }
dError("config msg is received, but not supported yet");
SCfgDnodeMsg *pCfg = pMsg->pCont;
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dError("config req is received, but not supported yet");
SDCfgDnodeReq *pCfg = pReq->pCont;
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup msg is received"); dDebug("startup req is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
dndGetStartup(pDnode, pStartup); dndGetStartup(pDnode, pStartup);
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)}; SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
@ -530,13 +531,8 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
if (dndInitMgmtWorker(pDnode) != 0) { if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndAllocMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -547,15 +543,14 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
dInfo("dnode-dnode is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
} }
void dndCleanupDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupMgmtWorker(pDnode); dndCleanupWorker(&pMgmt->mgmtWorker);
dndFreeMgmtQueue(pDnode);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
@ -580,62 +575,22 @@ void dndCleanupDnode(SDnode *pDnode) {
} }
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
dInfo("dnode-dnode is cleaned up"); dInfo("dnode-mgmt is cleaned up");
} }
static int32_t dndInitMgmtWorker(SDnode *pDnode) { void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "dnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("dnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("dnode mgmt worker is closed");
}
static int32_t dndAllocMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) { if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pDnode, pEpSet); dndUpdateMnodeEpSet(pDnode, pEpSet);
} }
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) {
if (pMsg != NULL) *pMsg = *pRpcMsg; if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
} }
@ -704,7 +659,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; code = -1;
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break; break;
} }

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndMnode.h" #include "dndMnode.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndQnode.h" #include "dndQnode.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndSnode.h" #include "dndSnode.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndWorker.h" #include "dndWorker.h"

View File

@ -21,7 +21,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndTransport.h" #include "dndTransport.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndMnode.h" #include "dndMnode.h"
#include "dndVnodes.h" #include "dndVnodes.h"

View File

@ -388,7 +388,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
free(pVnodes); free(pVnodes);
} }
dInfo("successed to write %s", file); dDebug("successed to write %s", realfile);
return taosRenameFile(file, realfile); return taosRenameFile(file, realfile);
} }

View File

@ -101,7 +101,9 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen)
} }
if (taosWriteQitem(pWorker->queue, pMsg) != 0) { if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
if (contLen != 0) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
}
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }

View File

@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndBnode.h" #include "dndBnode.h"
#include "dndDnode.h" #include "dndMgmt.h"
#include "dndMnode.h" #include "dndMnode.h"
#include "dndQnode.h" #include "dndQnode.h"
#include "dndSnode.h" #include "dndSnode.h"

View File

@ -3,18 +3,9 @@ enable_testing()
add_subdirectory(qnode) add_subdirectory(qnode)
add_subdirectory(bnode) add_subdirectory(bnode)
add_subdirectory(snode) add_subdirectory(snode)
# add_subdirectory(auth)
# add_subdirectory(balance)
add_subdirectory(db)
add_subdirectory(dnode)
# add_subdirectory(func)
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(profile) add_subdirectory(db)
add_subdirectory(stb) add_subdirectory(stb)
# add_subdirectory(sync)
# add_subdirectory(telem)
# add_subdirectory(trans)
add_subdirectory(vgroup) add_subdirectory(vgroup)
add_subdirectory(sut) add_subdirectory(sut)

View File

@ -1,11 +0,0 @@
aux_source_directory(. DTEST_SRC)
add_executable(dnode_test_dnode ${DTEST_SRC})
target_link_libraries(
dnode_test_dnode
PUBLIC sut
)
add_test(
NAME dnode_test_dnode
COMMAND dnode_test_dnode
)

View File

@ -1,261 +0,0 @@
/**
* @file dnode.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests
* @version 0.1
* @date 2021-12-15
*
* @copyright Copyright (c) 2021
*
*/
#include "sut.h"
class DndTestDnode : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_dnode1", 9041);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9041";
server2.Start("/tmp/dnode_test_dnode2", fqdn, 9042, firstEp);
server3.Start("/tmp/dnode_test_dnode3", fqdn, 9043, firstEp);
server4.Start("/tmp/dnode_test_dnode4", fqdn, 9044, firstEp);
server5.Start("/tmp/dnode_test_dnode5", fqdn, 9045, firstEp);
taosMsleep(300);
}
static void TearDownTestSuite() {
server2.Stop();
server3.Stop();
server4.Stop();
server5.Stop();
test.Cleanup();
}
static Testbase test;
static TestServer server2;
static TestServer server3;
static TestServer server4;
static TestServer server5;
};
Testbase DndTestDnode::test;
TestServer DndTestDnode::server2;
TestServer DndTestDnode::server3;
TestServer DndTestDnode::server4;
TestServer DndTestDnode::server5;
TEST_F(DndTestDnode, 01_ShowDnode) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "support_vnodes");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
}
TEST_F(DndTestDnode, 02_ConfigDnode) {
int32_t contLen = sizeof(SCfgDnodeMsg);
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9042);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckBinary("localhost:9042", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
{
int32_t contLen = sizeof(SDropDnodeMsg);
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9043);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9044);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9045);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckBinary("localhost:9043", TSDB_EP_LEN);
CheckBinary("localhost:9044", TSDB_EP_LEN);
CheckBinary("localhost:9045", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
// restart
uInfo("stop all server");
test.Restart();
server2.Restart();
server3.Restart();
server4.Restart();
server5.Restart();
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckBinary("localhost:9043", TSDB_EP_LEN);
CheckBinary("localhost:9044", TSDB_EP_LEN);
CheckBinary("localhost:9045", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
}

View File

@ -1,5 +1,5 @@
aux_source_directory(. MTEST_SRC) aux_source_directory(. DMTEST_SRC)
add_executable(dnode_test_mnode ${MTEST_SRC}) add_executable(dnode_test_mnode ${DMTEST_SRC})
target_link_libraries( target_link_libraries(
dnode_test_mnode dnode_test_mnode
PUBLIC sut PUBLIC sut

View File

@ -0,0 +1,26 @@
/**
* @file dmnode.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module mnode tests
* @version 1.0
* @date 2022-01-07
*
* @copyright Copyright (c) 2022
*
*/
#include "sut.h"
class DndTestMnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase DndTestMnode::test;

View File

@ -1,11 +0,0 @@
aux_source_directory(. PROFILE_SRC)
add_executable(dnode_test_profile ${PROFILE_SRC})
target_link_libraries(
dnode_test_profile
PUBLIC sut
)
add_test(
NAME dnode_test_profile
COMMAND dnode_test_profile
)

View File

@ -45,19 +45,19 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg); static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg); static int32_t mndProcessStatusReq(SMnodeMsg *pReq);
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
int32_t mndInitDnode(SMnode *pMnode) { int32_t mndInitDnode(SMnode *pMnode) {
@ -70,11 +70,11 @@ int32_t mndInitDnode(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndDnodeActionUpdate, .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete}; .deleteFp = (SdbDeleteFp)mndDnodeActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeMsg); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeMsg); mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeMsg); mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusMsg); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs);
@ -182,9 +182,9 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
return 0; return 0;
} }
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode) { static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOldDnode->id, pOldDnode, pNewDnode); mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
pOldDnode->updateTime = pNewDnode->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
@ -244,22 +244,22 @@ bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
return true; return true;
} }
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t i = 0; int32_t numOfEps = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break; if (pIter == NULL) break;
if (i >= numOfEps) { if (numOfEps >= maxEps) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
break; break;
} }
SDnodeEp *pEp = &pEps->eps[i]; SDnodeEp *pEp = &pEps->eps[numOfEps];
pEp->id = htonl(pDnode->id); pEp->id = htonl(pDnode->id);
pEp->port = htons(pDnode->port); pEp->port = htons(pDnode->port);
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
@ -267,11 +267,11 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
if (mndIsMnode(pMnode, pDnode->id)) { if (mndIsMnode(pMnode, pDnode->id)) {
pEp->isMnode = 1; pEp->isMnode = 1;
} }
i++; numOfEps++;
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }
pEps->num = htonl(i); pEps->num = htonl(numOfEps);
} }
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
@ -299,8 +299,9 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return 0; return 0;
} }
static void mndParseStatusMsg(SStatusMsg *pStatus) { static void mndParseStatusMsg(SStatusReq *pStatus) {
pStatus->sver = htonl(pStatus->sver); pStatus->sver = htonl(pStatus->sver);
pStatus->dver = htobe64(pStatus->dver);
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId); pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime); pStatus->rebootTime = htobe64(pStatus->rebootTime);
@ -309,11 +310,19 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes); pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) {
SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v];
pVload->vgId = htonl(pVload->vgId);
pVload->totalStorage = htobe64(pVload->totalStorage);
pVload->compStorage = htobe64(pVload->compStorage);
pVload->pointsWritten = htobe64(pVload->pointsWritten);
pVload->tablesNum = htobe64(pVload->tablesNum);
}
} }
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SStatusMsg *pStatus = pMsg->rpcMsg.pCont; SStatusReq *pStatus = pReq->rpcMsg.pCont;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
int32_t code = -1; int32_t code = -1;
@ -341,9 +350,11 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime); bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
bool reboot = (pDnode->rebootTime != pStatus->rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
if (needCheckCfg) { if (needCheck) {
if (pStatus->sver != pMnode->cfg.sver) { if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
@ -379,7 +390,11 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id); mInfo("dnode:%d, from offline to online", pDnode->id);
} else {
mDebug("dnode:%d, send dnode eps", pDnode->id);
}
pDnode->rebootTime = pStatus->rebootTime; pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores; pDnode->numOfCores = pStatus->numOfCores;
@ -393,12 +408,13 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
pRsp->dver = htobe64(sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->contLen = contLen; pReq->contLen = contLen;
pMsg->pCont = pRsp; pReq->pCont = pRsp;
} }
pDnode->lastAccessTime = curMs; pDnode->lastAccessTime = curMs;
@ -409,7 +425,7 @@ PROCESS_STATUS_MSG_OVER:
return code; return code;
} }
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *pCreate) {
SDnodeObj dnodeObj = {0}; SDnodeObj dnodeObj = {0};
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.createdTime = taosGetTimestampMs();
@ -418,7 +434,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr()); mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1; return -1;
@ -443,9 +459,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
return 0; return 0;
} }
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; SCreateDnodeReq *pCreate = pReq->rpcMsg.pCont;
pCreate->port = htonl(pCreate->port); pCreate->port = htonl(pCreate->port);
mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port);
@ -465,7 +481,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int32_t code = mndCreateDnode(pMnode, pMsg, pCreate); int32_t code = mndCreateDnode(pMnode, pReq, pCreate);
if (code != 0) { if (code != 0) {
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr());
@ -475,8 +491,8 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) { static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
return -1; return -1;
@ -501,9 +517,9 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode)
return 0; return 0;
} }
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SDropDnodeMsg *pDrop = pMsg->rpcMsg.pCont; SDropDnodeReq *pDrop = pReq->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId); pDrop->dnodeId = htonl(pDrop->dnodeId);
mDebug("dnode:%d, start to drop", pDrop->dnodeId); mDebug("dnode:%d, start to drop", pDrop->dnodeId);
@ -521,7 +537,7 @@ static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
int32_t code = mndDropDnode(pMnode, pMsg, pDnode); int32_t code = mndDropDnode(pMnode, pReq, pDnode);
if (code != 0) { if (code != 0) {
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
@ -532,9 +548,9 @@ static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SCfgDnodeMsg *pCfg = pMsg->rpcMsg.pCont; SMCfgDnodeReq *pCfg = pReq->rpcMsg.pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId);
@ -547,14 +563,14 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
SEpSet epSet = mndGetDnodeEpset(pDnode); SEpSet epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SCfgDnodeMsg *pCfgDnode = rpcMallocCont(sizeof(SCfgDnodeMsg)); SDCfgDnodeReq *pCfgDnode = rpcMallocCont(sizeof(SDCfgDnodeReq));
pCfgDnode->dnodeId = htonl(pCfg->dnodeId); pCfgDnode->dnodeId = htonl(pCfg->dnodeId);
memcpy(pCfgDnode->config, pCfg->config, TSDB_DNODE_CONFIG_LEN); memcpy(pCfgDnode->config, pCfg->config, TSDB_DNODE_CONFIG_LEN);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE,
.pCont = pCfgDnode, .pCont = pCfgDnode,
.contLen = sizeof(SCfgDnodeMsg), .contLen = sizeof(SDCfgDnodeReq),
.ahandle = pMsg->rpcMsg.ahandle}; .ahandle = pReq->rpcMsg.ahandle};
mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config); mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config);
mndSendReqToDnode(pMnode, &epSet, &rpcMsg); mndSendReqToDnode(pMnode, &epSet, &rpcMsg);
@ -562,11 +578,11 @@ static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) { static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pMsg->rpcMsg.ahandle); mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle);
} }
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema; SSchema *pSchema = pMeta->pSchema;
@ -597,8 +613,8 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
return 0; return 0;
} }
static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
char *cfgOpts[TSDB_CONFIG_NUMBER] = {0}; char *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0}; char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0};
@ -640,8 +656,8 @@ static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {} static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0; int32_t cols = 0;
@ -704,8 +720,8 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0; return 0;
} }
static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;

View File

@ -47,14 +47,14 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn); static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg); static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg); static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg); static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg); static int32_t mndProcessKillConnReq(SMnodeMsg *pReq);
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) { int32_t mndInitProfile(SMnode *pMnode) {
@ -68,10 +68,10 @@ int32_t mndInitProfile(SMnode *pMnode) {
return -1; return -1;
} }
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatMsg); mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectMsg); mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryMsg); mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnectionMsg); mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
@ -178,35 +178,35 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
} }
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SConnectMsg *pReq = pMsg->rpcMsg.pCont; SConnectReq *pConnReq = pReq->rpcMsg.pCont;
pReq->pid = htonl(pReq->pid); pConnReq->pid = htonl(pConnReq->pid);
pReq->startTime = htobe64(pReq->startTime); pConnReq->startTime = htobe64(pConnReq->startTime);
SRpcConnInfo info = {0}; SRpcConnInfo info = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) { if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, failed to login while get connection info since %s", pMsg->user, terrstr()); mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
return -1; return -1;
} }
char ip[30]; char ip[30];
taosIp2String(info.clientIp, ip); taosIp2String(info.clientIp, ip);
if (pReq->db[0]) { if (pConnReq->db[0]) {
snprintf(pMsg->db, TSDB_DB_FNAME_LEN, "%d%s%s", pMsg->acctId, TS_PATH_DELIMITER, pReq->db); snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pMsg->user, ip, pReq->db, terrstr()); mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
return -1; return -1;
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
} }
SConnObj *pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, pReq->startTime); SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
return -1; return -1;
} }
@ -214,11 +214,11 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr()); mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
return -1; return -1;
} }
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser != NULL) { if (pUser != NULL) {
pRsp->acctId = htonl(pUser->acctId); pRsp->acctId = htonl(pUser->acctId);
pRsp->superUser = pUser->superUser; pRsp->superUser = pUser->superUser;
@ -230,16 +230,16 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
pMsg->contLen = sizeof(SConnectRsp); pReq->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp; pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pReq->app); mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
return 0; return 0;
} }
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) { static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
pConn->numOfQueries = 0; pConn->numOfQueries = 0;
int32_t numOfQueries = htonl(pMsg->numOfQueries); int32_t numOfQueries = htonl(pReq->numOfQueries);
if (numOfQueries > 0) { if (numOfQueries > 0) {
if (pConn->pQueries == NULL) { if (pConn->pQueries == NULL) {
@ -250,38 +250,38 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc); int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
if (saveSize > 0 && pConn->pQueries != NULL) { if (saveSize > 0 && pConn->pQueries != NULL) {
memcpy(pConn->pQueries, pMsg->pData, saveSize); memcpy(pConn->pQueries, pReq->pData, saveSize);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont; SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont;
pReq->connId = htonl(pReq->connId); pHeartbeat->connId = htonl(pHeartbeat->connId);
pReq->pid = htonl(pReq->pid); pHeartbeat->pid = htonl(pHeartbeat->pid);
SRpcConnInfo info = {0}; SRpcConnInfo info = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) { if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, connId:%d failed to process hb since %s", pMsg->user, pReq->connId, terrstr()); mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1; return -1;
} }
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId); SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, 0); pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr()); mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1; return -1;
} else { } else {
mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->id); mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id);
} }
} else if (pConn->killed) { } else if (pConn->killed) {
mError("user:%s, conn:%d is already killed", pMsg->user, pConn->id); mError("user:%s, conn:%d is already killed", pReq->user, pConn->id);
terrno = TSDB_CODE_MND_INVALID_CONNECTION; terrno = TSDB_CODE_MND_INVALID_CONNECTION;
return -1; return -1;
} else { } else {
@ -304,11 +304,11 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, conn:%d failed to process hb while create rsp since %s", pMsg->user, pReq->connId, terrstr()); mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1; return -1;
} }
mndSaveQueryStreamList(pConn, pReq); mndSaveQueryStreamList(pConn, pHeartbeat);
if (pConn->killed != 0) { if (pConn->killed != 0) {
pRsp->killConnection = 1; pRsp->killConnection = 1;
} }
@ -324,16 +324,16 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
pMsg->contLen = sizeof(SConnectRsp); pReq->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp; pReq->pCont = pRsp;
return 0; return 0;
} }
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (!pUser->superUser) { if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
@ -342,7 +342,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
} }
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; SKillQueryReq *pKill = pReq->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId); int32_t connId = htonl(pKill->connId);
int32_t queryId = htonl(pKill->queryId); int32_t queryId = htonl(pKill->queryId);
mInfo("kill query msg is received, queryId:%d", pKill->queryId); mInfo("kill query msg is received, queryId:%d", pKill->queryId);
@ -353,18 +353,18 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID; terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1; return -1;
} else { } else {
mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pMsg->user); mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pReq->user);
pConn->queryId = queryId; pConn->queryId = queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0; return 0;
} }
} }
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (!pUser->superUser) { if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
@ -373,7 +373,7 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
} }
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
SKillConnMsg *pKill = pMsg->rpcMsg.pCont; SKillConnReq *pKill = pReq->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId); int32_t connId = htonl(pKill->connId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
@ -382,18 +382,18 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID; terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1; return -1;
} else { } else {
mInfo("connId:%d, is killed by user:%s", connId, pMsg->user); mInfo("connId:%d, is killed by user:%s", connId, pReq->user);
pConn->killed = 1; pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (!pUser->superUser) { if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
@ -464,8 +464,8 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0; return 0;
} }
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;
@ -518,11 +518,11 @@ static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
return numOfRows; return numOfRows;
} }
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (!pUser->superUser) { if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
@ -633,8 +633,8 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0; return 0;
} }
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t cols = 0; int32_t cols = 0;

View File

@ -124,20 +124,20 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) { if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show msg since %s", terrstr()); mError("failed to process show-meta req since %s", terrstr());
return -1; return -1;
} }
ShowMetaFp metaFp = pMgmt->metaFps[type]; ShowMetaFp metaFp = pMgmt->metaFps[type];
if (metaFp == NULL) { if (metaFp == NULL) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); mError("failed to process show-meta req:%s since %s", mndShowStr(type), terrstr());
return -1; return -1;
} }
SShowObj *pShow = mndCreateShowObj(pMnode, pShowReq); SShowObj *pShow = mndCreateShowObj(pMnode, pShowReq);
if (pShow == NULL) { if (pShow == NULL) {
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); mError("failed to process show-meta req:%s since %s", mndShowStr(type), terrstr());
return -1; return -1;
} }
@ -146,7 +146,7 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:0x%" PRIx64 ", failed to process show-meta msg:%s since malloc rsp error", pShow->id, mError("show:0x%" PRIx64 ", failed to process show-meta req:%s since malloc rsp error", pShow->id,
mndShowStr(type)); mndShowStr(type));
return -1; return -1;
} }
@ -181,7 +181,7 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
SShowObj *pShow = mndAcquireShowObj(pMnode, showId); SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
if (pShow == NULL) { if (pShow == NULL) {
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ; terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr()); mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
return -1; return -1;
} }

View File

@ -7,3 +7,6 @@ add_subdirectory(qnode)
add_subdirectory(snode) add_subdirectory(snode)
add_subdirectory(bnode) add_subdirectory(bnode)
add_subdirectory(show) add_subdirectory(show)
add_subdirectory(profile)
add_subdirectory(dnode)
add_subdirectory(mnode)

View File

@ -96,9 +96,9 @@ TEST_F(MndTestBnode, 02_Create_Bnode) {
TEST_F(MndTestBnode, 03_Drop_Bnode) { TEST_F(MndTestBnode, 03_Drop_Bnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9019); pReq->port = htonl(9019);

View File

@ -0,0 +1,11 @@
aux_source_directory(. DTEST_SRC)
add_executable(mnode_test_dnode ${DTEST_SRC})
target_link_libraries(
mnode_test_dnode
PUBLIC sut
)
add_test(
NAME mnode_test_dnode
COMMAND mnode_test_dnode
)

View File

@ -0,0 +1,350 @@
/**
* @file dnode.cpp
* @author slguan (slguan@taosdata.com)
* @brief MNODE module dnode tests
* @version 1.0
* @date 2022-01-06
*
* @copyright Copyright (c) 2022
*
*/
#include "sut.h"
class MndTestDnode : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_dnode1", 9023);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9023";
server2.Start("/tmp/dnode_test_dnode2", fqdn, 9024, firstEp);
server3.Start("/tmp/dnode_test_dnode3", fqdn, 9025, firstEp);
server4.Start("/tmp/dnode_test_dnode4", fqdn, 9026, firstEp);
server5.Start("/tmp/dnode_test_dnode5", fqdn, 9027, firstEp);
taosMsleep(300);
}
static void TearDownTestSuite() {
server2.Stop();
server3.Stop();
server4.Stop();
server5.Stop();
test.Cleanup();
}
static Testbase test;
static TestServer server2;
static TestServer server3;
static TestServer server4;
static TestServer server5;
};
Testbase MndTestDnode::test;
TestServer MndTestDnode::server2;
TestServer MndTestDnode::server3;
TestServer MndTestDnode::server4;
TestServer MndTestDnode::server5;
TEST_F(MndTestDnode, 01_ShowDnode) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "support_vnodes");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9023", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
}
TEST_F(MndTestDnode, 02_ConfigDnode) {
int32_t contLen = sizeof(SMCfgDnodeReq);
SMCfgDnodeReq* pReq = (SMCfgDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
TEST_F(MndTestDnode, 03_Create_Dnode) {
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "");
pReq->port = htonl(9024);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DNODE_EP);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(-1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DNODE_EP);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(123456);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DNODE_EP);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9024);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9024);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DNODE_ALREADY_EXIST);
}
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9023", TSDB_EP_LEN);
CheckBinary("localhost:9024", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
}
TEST_F(MndTestDnode, 04_Drop_Dnode) {
{
int32_t contLen = sizeof(SDropDnodeReq);
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(-3);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DNODE_ID);
}
{
int32_t contLen = sizeof(SDropDnodeReq);
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(5);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
}
{
int32_t contLen = sizeof(SDropDnodeReq);
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDropDnodeReq);
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
}
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9023", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(16);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
taosMsleep(2000);
server2.Stop();
server2.DoStart();
}
TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9025);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9026);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9027);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9023", TSDB_EP_LEN);
CheckBinary("localhost:9025", TSDB_EP_LEN);
CheckBinary("localhost:9026", TSDB_EP_LEN);
CheckBinary("localhost:9027", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
// restart
uInfo("stop all server");
test.Restart();
server2.Restart();
server3.Restart();
server4.Restart();
server5.Restart();
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9023", TSDB_EP_LEN);
CheckBinary("localhost:9025", TSDB_EP_LEN);
CheckBinary("localhost:9026", TSDB_EP_LEN);
CheckBinary("localhost:9027", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckInt16(16);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
}

View File

@ -0,0 +1,11 @@
aux_source_directory(. MTEST_SRC)
add_executable(mnode_test_mnode ${MTEST_SRC})
target_link_libraries(
mnode_test_mnode
PUBLIC sut
)
add_test(
NAME mnode_test_mnode
COMMAND mnode_test_mnode
)

View File

@ -1,31 +1,31 @@
/** /**
* @file dnode.cpp * @file mnode.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests * @brief MNODE module mnode tests
* @version 0.1 * @version 1.0
* @date 2021-12-15 * @date 2022-01-07
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2022
* *
*/ */
#include "sut.h" #include "sut.h"
class DndTestMnode : public ::testing::Test { class MndTestMnode : public ::testing::Test {
public: public:
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
public: public:
static void SetUpTestSuite() { static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_mnode1", 9061); test.Init("/tmp/mnode_test_mnode1", 9031);
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9061"; const char* firstEp = "localhost:9031";
server2.Start("/tmp/dnode_test_mnode2", fqdn, 9062, firstEp); server2.Start("/tmp/mnode_test_mnode2", fqdn, 9032, firstEp);
server3.Start("/tmp/dnode_test_mnode3", fqdn, 9063, firstEp); server3.Start("/tmp/mnode_test_mnode3", fqdn, 9033, firstEp);
server4.Start("/tmp/dnode_test_mnode4", fqdn, 9064, firstEp); server4.Start("/tmp/mnode_test_mnode4", fqdn, 9034, firstEp);
server5.Start("/tmp/dnode_test_mnode5", fqdn, 9065, firstEp); server5.Start("/tmp/mnode_test_mnode5", fqdn, 9035, firstEp);
taosMsleep(300); taosMsleep(300);
} }
@ -44,13 +44,13 @@ class DndTestMnode : public ::testing::Test {
static TestServer server5; static TestServer server5;
}; };
Testbase DndTestMnode::test; Testbase MndTestMnode::test;
TestServer DndTestMnode::server2; TestServer MndTestMnode::server2;
TestServer DndTestMnode::server3; TestServer MndTestMnode::server3;
TestServer DndTestMnode::server4; TestServer MndTestMnode::server4;
TestServer DndTestMnode::server5; TestServer MndTestMnode::server5;
TEST_F(DndTestMnode, 01_ShowDnode) { TEST_F(MndTestMnode, 01_ShowDnode) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_MNODE, "");
CHECK_META("show mnodes", 5); CHECK_META("show mnodes", 5);
@ -64,13 +64,13 @@ TEST_F(DndTestMnode, 01_ShowDnode) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1); CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN); CheckBinary("localhost:9031", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckInt64(0); CheckInt64(0);
CheckTimestamp(); CheckTimestamp();
} }
TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) {
{ {
int32_t contLen = sizeof(SMCreateMnodeMsg); int32_t contLen = sizeof(SMCreateMnodeMsg);
@ -83,7 +83,7 @@ TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
} }
} }
TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) { TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) {
{ {
int32_t contLen = sizeof(SMCreateMnodeMsg); int32_t contLen = sizeof(SMCreateMnodeMsg);
@ -96,14 +96,14 @@ TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) {
} }
} }
TEST_F(DndTestMnode, 04_Create_Mnode) { TEST_F(MndTestMnode, 04_Create_Mnode) {
{ {
// create dnode // create dnode
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9062); pReq->port = htonl(9032);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
@ -132,8 +132,8 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
CheckInt16(1); CheckInt16(1);
CheckInt16(2); CheckInt16(2);
CheckBinary("localhost:9061", TSDB_EP_LEN); CheckBinary("localhost:9031", TSDB_EP_LEN);
CheckBinary("localhost:9062", TSDB_EP_LEN); CheckBinary("localhost:9032", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckBinary("slave", 12); CheckBinary("slave", 12);
CheckInt64(0); CheckInt64(0);
@ -158,16 +158,16 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1); CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN); CheckBinary("localhost:9031", TSDB_EP_LEN);
CheckBinary("master", 12); CheckBinary("master", 12);
CheckInt64(0); CheckInt64(0);
CheckTimestamp(); CheckTimestamp();
} }
} }
// { // {
// int32_t contLen = sizeof(SDropDnodeMsg); // int32_t contLen = sizeof(SDropDnodeReq);
// SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen); // SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(2); // pReq->dnodeId = htonl(2);
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); // SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
@ -181,7 +181,7 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// EXPECT_EQ(test.GetShowRows(), 1); // EXPECT_EQ(test.GetShowRows(), 1);
// CheckInt16(1); // CheckInt16(1);
// CheckBinary("localhost:9061", TSDB_EP_LEN); // CheckBinary("localhost:9031", TSDB_EP_LEN);
// CheckInt16(0); // CheckInt16(0);
// CheckInt16(1); // CheckInt16(1);
// CheckBinary("ready", 10); // CheckBinary("ready", 10);
@ -189,10 +189,10 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// CheckBinary("", 24); // CheckBinary("", 24);
// { // {
// int32_t contLen = sizeof(SCreateDnodeMsg); // int32_t contLen = sizeof(SCreateDnodeReq);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); // SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9063"); // strcpy(pReq->ep, "localhost:9033");
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); // SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr); // ASSERT_NE(pRsp, nullptr);
@ -200,10 +200,10 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// } // }
// { // {
// int32_t contLen = sizeof(SCreateDnodeMsg); // int32_t contLen = sizeof(SCreateDnodeReq);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); // SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9064"); // strcpy(pReq->ep, "localhost:9034");
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); // SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr); // ASSERT_NE(pRsp, nullptr);
@ -211,10 +211,10 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// } // }
// { // {
// int32_t contLen = sizeof(SCreateDnodeMsg); // int32_t contLen = sizeof(SCreateDnodeReq);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); // SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9065"); // strcpy(pReq->ep, "localhost:9035");
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); // SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr); // ASSERT_NE(pRsp, nullptr);
@ -231,10 +231,10 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// CheckInt16(3); // CheckInt16(3);
// CheckInt16(4); // CheckInt16(4);
// CheckInt16(5); // CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN); // CheckBinary("localhost:9031", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN); // CheckBinary("localhost:9033", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN); // CheckBinary("localhost:9034", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN); // CheckBinary("localhost:9035", TSDB_EP_LEN);
// CheckInt16(0); // CheckInt16(0);
// CheckInt16(0); // CheckInt16(0);
// CheckInt16(0); // CheckInt16(0);
@ -274,10 +274,10 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
// CheckInt16(3); // CheckInt16(3);
// CheckInt16(4); // CheckInt16(4);
// CheckInt16(5); // CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN); // CheckBinary("localhost:9031", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN); // CheckBinary("localhost:9033", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN); // CheckBinary("localhost:9034", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN); // CheckBinary("localhost:9035", TSDB_EP_LEN);
// CheckInt16(0); // CheckInt16(0);
// CheckInt16(0); // CheckInt16(0);
// CheckInt16(0); // CheckInt16(0);

View File

@ -0,0 +1,11 @@
aux_source_directory(. PROFILE_SRC)
add_executable(mnode_test_profile ${PROFILE_SRC})
target_link_libraries(
mnode_test_profile
PUBLIC sut
)
add_test(
NAME mnode_test_profile
COMMAND mnode_test_profile
)

View File

@ -1,19 +1,19 @@
/** /**
* @file profile.cpp * @file profile.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module profile-msg tests * @brief MNODE module profile tests
* @version 0.1 * @version 1.0
* @date 2021-12-15 * @date 2022-01-06
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2022
* *
*/ */
#include "sut.h" #include "sut.h"
class DndTestProfile : public ::testing::Test { class MndTestProfile : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_profile", 9080); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9022); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
@ -24,15 +24,15 @@ class DndTestProfile : public ::testing::Test {
void TearDown() override {} void TearDown() override {}
}; };
Testbase DndTestProfile::test; Testbase MndTestProfile::test;
int32_t DndTestProfile::connId; int32_t MndTestProfile::connId;
TEST_F(DndTestProfile, 01_ConnectMsg) { TEST_F(MndTestProfile, 01_ConnectMsg) {
int32_t contLen = sizeof(SConnectMsg); int32_t contLen = sizeof(SConnectReq);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen); SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
strcpy(pReq->db, ""); strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
@ -53,18 +53,18 @@ TEST_F(DndTestProfile, 01_ConnectMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9080); EXPECT_EQ(pRsp->epSet.port[0], 9022);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = pRsp->connId;
} }
TEST_F(DndTestProfile, 02_ConnectMsg_InvalidDB) { TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
int32_t contLen = sizeof(SConnectMsg); int32_t contLen = sizeof(SConnectReq);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen); SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
strcpy(pReq->db, "invalid_db"); strcpy(pReq->db, "invalid_db");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
@ -73,7 +73,7 @@ TEST_F(DndTestProfile, 02_ConnectMsg_InvalidDB) {
ASSERT_EQ(pRsp->contLen, 0); ASSERT_EQ(pRsp->contLen, 0);
} }
TEST_F(DndTestProfile, 03_ConnectMsg_Show) { TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
CHECK_META("show connections", 7); CHECK_META("show connections", 7);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "connId"); CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "connId");
@ -88,22 +88,22 @@ TEST_F(DndTestProfile, 03_ConnectMsg_Show) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckInt32(1); CheckInt32(1);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("dnode_test_profile", TSDB_APP_NAME_LEN); CheckBinary("mnode_test_profile", TSDB_APP_NAME_LEN);
CheckInt32(1234); CheckInt32(1234);
IgnoreBinary(TSDB_IPv4ADDR_LEN + 6); IgnoreBinary(TSDB_IPv4ADDR_LEN + 6);
CheckTimestamp(); CheckTimestamp();
CheckTimestamp(); CheckTimestamp();
} }
TEST_F(DndTestProfile, 04_HeartBeatMsg) { TEST_F(MndTestProfile, 04_HeartBeatMsg) {
int32_t contLen = sizeof(SHeartBeatMsg); int32_t contLen = sizeof(SHeartBeatReq);
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen); SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
pReq->connId = htonl(connId); pReq->connId = htonl(connId);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
pReq->numOfQueries = htonl(0); pReq->numOfQueries = htonl(0);
pReq->numOfStreams = htonl(0); pReq->numOfStreams = htonl(0);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
@ -127,15 +127,15 @@ TEST_F(DndTestProfile, 04_HeartBeatMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9080); EXPECT_EQ(pRsp->epSet.port[0], 9022);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
} }
TEST_F(DndTestProfile, 05_KillConnMsg) { TEST_F(MndTestProfile, 05_KillConnMsg) {
{ {
int32_t contLen = sizeof(SKillConnMsg); int32_t contLen = sizeof(SKillConnReq);
SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(contLen); SKillConnReq* pReq = (SKillConnReq*)rpcMallocCont(contLen);
pReq->connId = htonl(connId); pReq->connId = htonl(connId);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
@ -144,14 +144,14 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
} }
{ {
int32_t contLen = sizeof(SHeartBeatMsg); int32_t contLen = sizeof(SHeartBeatReq);
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen); SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
pReq->connId = htonl(connId); pReq->connId = htonl(connId);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
pReq->numOfQueries = htonl(0); pReq->numOfQueries = htonl(0);
pReq->numOfStreams = htonl(0); pReq->numOfStreams = htonl(0);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
@ -160,11 +160,11 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
} }
{ {
int32_t contLen = sizeof(SConnectMsg); int32_t contLen = sizeof(SConnectReq);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen); SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
strcpy(pReq->db, ""); strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
@ -185,17 +185,17 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9080); EXPECT_EQ(pRsp->epSet.port[0], 9022);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = pRsp->connId;
} }
} }
TEST_F(DndTestProfile, 06_KillConnMsg_InvalidConn) { TEST_F(MndTestProfile, 06_KillConnMsg_InvalidConn) {
int32_t contLen = sizeof(SKillConnMsg); int32_t contLen = sizeof(SKillConnReq);
SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(contLen); SKillConnReq* pReq = (SKillConnReq*)rpcMallocCont(contLen);
pReq->connId = htonl(2345); pReq->connId = htonl(2345);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_CONN, pReq, contLen);
@ -203,11 +203,11 @@ TEST_F(DndTestProfile, 06_KillConnMsg_InvalidConn) {
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
} }
TEST_F(DndTestProfile, 07_KillQueryMsg) { TEST_F(MndTestProfile, 07_KillQueryMsg) {
{ {
int32_t contLen = sizeof(SKillQueryMsg); int32_t contLen = sizeof(SKillQueryReq);
SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(contLen); SKillQueryReq* pReq = (SKillQueryReq*)rpcMallocCont(contLen);
pReq->connId = htonl(connId); pReq->connId = htonl(connId);
pReq->queryId = htonl(1234); pReq->queryId = htonl(1234);
@ -218,14 +218,14 @@ TEST_F(DndTestProfile, 07_KillQueryMsg) {
} }
{ {
int32_t contLen = sizeof(SHeartBeatMsg); int32_t contLen = sizeof(SHeartBeatReq);
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(contLen); SHeartBeatReq* pReq = (SHeartBeatReq*)rpcMallocCont(contLen);
pReq->connId = htonl(connId); pReq->connId = htonl(connId);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
pReq->numOfQueries = htonl(0); pReq->numOfQueries = htonl(0);
pReq->numOfStreams = htonl(0); pReq->numOfStreams = htonl(0);
strcpy(pReq->app, "dnode_test_profile"); strcpy(pReq->app, "mnode_test_profile");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_HEARTBEAT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
@ -249,15 +249,15 @@ TEST_F(DndTestProfile, 07_KillQueryMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9080); EXPECT_EQ(pRsp->epSet.port[0], 9022);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
} }
} }
TEST_F(DndTestProfile, 08_KillQueryMsg_InvalidConn) { TEST_F(MndTestProfile, 08_KillQueryMsg_InvalidConn) {
int32_t contLen = sizeof(SKillQueryMsg); int32_t contLen = sizeof(SKillQueryReq);
SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(contLen); SKillQueryReq* pReq = (SKillQueryReq*)rpcMallocCont(contLen);
pReq->connId = htonl(2345); pReq->connId = htonl(2345);
pReq->queryId = htonl(1234); pReq->queryId = htonl(1234);
@ -266,7 +266,7 @@ TEST_F(DndTestProfile, 08_KillQueryMsg_InvalidConn) {
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_CONN_ID);
} }
TEST_F(DndTestProfile, 09_KillQueryMsg) { TEST_F(MndTestProfile, 09_KillQueryMsg) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_QUERIES, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_QUERIES, "");
CHECK_META("show queries", 14); CHECK_META("show queries", 14);

View File

@ -96,9 +96,9 @@ TEST_F(MndTestQnode, 02_Create_Qnode) {
TEST_F(MndTestQnode, 03_Drop_Qnode) { TEST_F(MndTestQnode, 03_Drop_Qnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9015); pReq->port = htonl(9015);

View File

@ -13,7 +13,7 @@
class MndTestShow : public ::testing::Test { class MndTestShow : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9020); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9021); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
@ -50,9 +50,9 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
int32_t contLen = sizeof(SConnectMsg); int32_t contLen = sizeof(SConnectReq);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen); SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
pReq->pid = htonl(1234); pReq->pid = htonl(1234);
strcpy(pReq->app, "mnode_test_show"); strcpy(pReq->app, "mnode_test_show");
strcpy(pReq->db, ""); strcpy(pReq->db, "");

View File

@ -96,9 +96,9 @@ TEST_F(MndTestSnode, 02_Create_Snode) {
TEST_F(MndTestSnode, 03_Drop_Snode) { TEST_F(MndTestSnode, 03_Drop_Snode) {
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9017); pReq->port = htonl(9017);

View File

@ -133,9 +133,9 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost"); strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9020); pReq->port = htonl(9020);

View File

@ -429,3 +429,12 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
maxId = MAX(maxId, pSdb->maxId[type]); maxId = MAX(maxId, pSdb->maxId[type]);
return maxId + 1; return maxId + 1;
} }
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
if (type >= SDB_MAX || type < 0) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
return -1;
}
return pSdb->tableVer[type];
}

View File

@ -12,7 +12,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* ms
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
#endif // TDENGINE_ASTTOMSG_H #endif // TDENGINE_ASTTOMSG_H

View File

@ -335,7 +335,7 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p
return pDropTableMsg; return pDropTableMsg;
} }
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid host name (name too long, maximum length 128)"; const char* msg1 = "invalid host name (name too long, maximum length 128)";
const char* msg2 = "dnode name can not be string"; const char* msg2 = "dnode name can not be string";
const char* msg3 = "port should be an integer that is less than 65535 and greater than 0"; const char* msg3 = "port should be an integer that is less than 65535 and greater than 0";
@ -367,7 +367,7 @@ SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs
return NULL; return NULL;
} }
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *) calloc(1, sizeof(SCreateDnodeMsg)); SCreateDnodeReq *pCreate = (SCreateDnodeReq *) calloc(1, sizeof(SCreateDnodeReq));
if (pCreate == NULL) { if (pCreate == NULL) {
buildInvalidOperationMsg(pMsgBuf, msg4); buildInvalidOperationMsg(pMsgBuf, msg4);
return NULL; return NULL;
@ -376,18 +376,18 @@ SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs
strncpy(pCreate->fqdn, id->z, id->n); strncpy(pCreate->fqdn, id->z, id->n);
pCreate->port = htonl(val); pCreate->port = htonl(val);
*len = sizeof(SCreateDnodeMsg); *len = sizeof(SCreateDnodeReq);
return pCreate; return pCreate;
} }
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0);
char* end = NULL; char* end = NULL;
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg)); SDropDnodeReq * pDrop = (SDropDnodeReq *)calloc(1, sizeof(SDropDnodeReq));
pDrop->dnodeId = strtoll(pzName->z, &end, 10); pDrop->dnodeId = strtoll(pzName->z, &end, 10);
pDrop->dnodeId = htonl(pDrop->dnodeId); pDrop->dnodeId = htonl(pDrop->dnodeId);
*len = sizeof(SDropDnodeMsg); *len = sizeof(SDropDnodeReq);
if (end - pzName->z != pzName->n) { if (end - pzName->z != pzName->n) {
buildInvalidOperationMsg(pMsgBuf, "invalid dnode id"); buildInvalidOperationMsg(pMsgBuf, "invalid dnode id");

View File

@ -3819,7 +3819,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
char* pMsg = pCmd->payload; char* pMsg = pCmd->payload;
SCfgDnodeMsg* pCfg = (SCfgDnodeMsg*)pMsg; SMCfgDnodeReq* pCfg = (SMCfgDnodeReq*)pMsg;
SToken* t0 = taosArrayGet(pMiscInfo->a, 0); SToken* t0 = taosArrayGet(pMiscInfo->a, 0);
SToken* t1 = taosArrayGet(pMiscInfo->a, 1); SToken* t1 = taosArrayGet(pMiscInfo->a, 1);

View File

@ -226,7 +226,7 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
} }
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SHeartBeatMsg *pHeartbeat = pMsg; SHeartBeatReq *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries; int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams; int allocedStreamsNum = pHeartbeat->numOfStreams;
@ -327,7 +327,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
} }
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SHeartBeatMsg); sizeof(SHeartBeatReq);
pHeartbeat->connId = htonl(pObj->connId); pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries); pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams); pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);

View File

@ -776,7 +776,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char* pMsg = pCmd->payload; char* pMsg = pCmd->payload;
SCfgDnodeMsg* pCfg = (SCfgDnodeMsg*)pMsg; SMCfgDnodeReq* pCfg = (SMCfgDnodeReq*)pMsg;
SStrToken* t0 = taosArrayGet(pMiscInfo->a, 0); SStrToken* t0 = taosArrayGet(pMiscInfo->a, 0);
SStrToken* t1 = taosArrayGet(pMiscInfo->a, 1); SStrToken* t1 = taosArrayGet(pMiscInfo->a, 1);

View File

@ -1192,13 +1192,13 @@ int32_t tscBuildCreateFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateDnodeMsg); pCmd->payloadLen = sizeof(SCreateDnodeReq);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self); tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCmd->payload; SCreateDnodeReq *pCreate = (SCreateDnodeReq *)pCmd->payload;
SStrToken* t0 = taosArrayGet(pInfo->pMiscInfo->a, 0); SStrToken* t0 = taosArrayGet(pInfo->pMiscInfo->a, 0);
strncpy(pCreate->ep, t0->z, t0->n); strncpy(pCreate->ep, t0->z, t0->n);
@ -1287,7 +1287,7 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCfgDnodeMsg); pCmd->payloadLen = sizeof(SMCfgDnodeReq);
pCmd->msgType = TDMT_MND_CONFIG_DNODE; pCmd->msgType = TDMT_MND_CONFIG_DNODE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1350,13 +1350,13 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char dnodeEp[TSDB_EP_LEN] = {0}; char dnodeEp[TSDB_EP_LEN] = {0};
tstrncpy(dnodeEp, pCmd->payload, TSDB_EP_LEN); tstrncpy(dnodeEp, pCmd->payload, TSDB_EP_LEN);
pCmd->payloadLen = sizeof(SDropDnodeMsg); pCmd->payloadLen = sizeof(SDropDnodeReq);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self); tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SDropDnodeMsg * pDrop = (SDropDnodeMsg *)pCmd->payload; SDropDnodeReq * pDrop = (SDropDnodeReq *)pCmd->payload;
tstrncpy(pDrop->ep, dnodeEp, tListLen(pDrop->ep)); tstrncpy(pDrop->ep, dnodeEp, tListLen(pDrop->ep));
pCmd->msgType = TDMT_MND_DROP_DNODE; pCmd->msgType = TDMT_MND_DROP_DNODE;
@ -1469,7 +1469,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SKillQueryMsg); pCmd->payloadLen = sizeof(SKillQueryReq);
switch (pCmd->command) { switch (pCmd->command) {
case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_QUERY:
@ -1862,14 +1862,14 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TDMT_MND_CONNECT; pCmd->msgType = TDMT_MND_CONNECT;
pCmd->payloadLen = sizeof(SConnectMsg); pCmd->payloadLen = sizeof(SConnectReq);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self); tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SConnectMsg *pConnect = (SConnectMsg*)pCmd->payload; SConnectReq *pConnect = (SConnectReq*)pCmd->payload;
// TODO refactor full_name // TODO refactor full_name
char *db; // ugly code to move the space char *db; // ugly code to move the space
@ -1974,7 +1974,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
numOfStreams++; numOfStreams++;
} }
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatMsg) + 100; int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SHeartBeatReq) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
tscError("0x%"PRIx64" failed to create heartbeat msg", pSql->self); tscError("0x%"PRIx64" failed to create heartbeat msg", pSql->self);
@ -1982,7 +1982,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
// TODO the expired hb and client can not be identified by server till now. // TODO the expired hb and client can not be identified by server till now.
SHeartBeatMsg *pHeartbeat = (SHeartBeatMsg *)pCmd->payload; SHeartBeatReq *pHeartbeat = (SHeartBeatReq *)pCmd->payload;
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer)); tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfQueries = numOfQueries;

View File

@ -51,6 +51,13 @@ if $rows != 2 then
return -1 return -1
endi endi
sql create table c3 using st tags(3) c4 using st tags(4) c5 using st tags(5) c6 using st tags(6) c7 using st tags(7)
sql show tables
if $rows != 7 then
return -1
endi
print $data00 $data01 $data02 print $data00 $data01 $data02
print $data10 $data11 $data22 print $data10 $data11 $data22
print $data20 $data11 $data22 print $data20 $data11 $data22

View File

@ -15,25 +15,20 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "thash.h"
#include "tutil.h"
#include "ulog.h" #include "ulog.h"
#define MAX_RANDOM_POINTS 20000
#define GREEN "\033[1;32m" #define GREEN "\033[1;32m"
#define NC "\033[0m" #define NC "\033[0m"
char dbName[32] = "db"; char dbName[32] = "db";
char stbName[64] = "st"; char stbName[64] = "st";
int32_t numOfThreads = 2; int32_t numOfThreads = 1;
int32_t numOfTables = 10000; int32_t numOfTables = 10000;
int32_t createTable = 1; int32_t createTable = 1;
int32_t insertData = 0; int32_t insertData = 0;
int32_t batchNum = 1; int32_t batchNum = 10;
int32_t numOfVgroups = 2; int32_t numOfVgroups = 2;
typedef struct { typedef struct {
@ -47,11 +42,11 @@ typedef struct {
pthread_t thread; pthread_t thread;
} SThreadInfo; } SThreadInfo;
void parseArgument(int argc, char *argv[]); void parseArgument(int32_t argc, char *argv[]);
void *threadFunc(void *param); void *threadFunc(void *param);
void createDbAndStb(); void createDbAndStb();
int main(int argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
createDbAndStb(); createDbAndStb();
@ -64,7 +59,7 @@ int main(int argc, char *argv[]) {
int32_t numOfTablesPerThread = numOfTables / numOfThreads; int32_t numOfTablesPerThread = numOfTables / numOfThreads;
numOfTables = numOfTablesPerThread * numOfThreads; numOfTables = numOfTablesPerThread * numOfThreads;
for (int i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = i * numOfTablesPerThread; pInfo[i].tableBeginIndex = i * numOfTablesPerThread;
pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread;
pInfo[i].threadIndex = i; pInfo[i].threadIndex = i;
@ -74,17 +69,17 @@ int main(int argc, char *argv[]) {
} }
taosMsleep(300); taosMsleep(300);
for (int i = 0; i < numOfThreads; i++) { for (int32_t i = 0; i < numOfThreads; i++) {
pthread_join(pInfo[i].thread, NULL); pthread_join(pInfo[i].thread, NULL);
} }
float createTableSpeed = 0; float createTableSpeed = 0;
for (int i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
createTableSpeed += pInfo[i].createTableSpeed; createTableSpeed += pInfo[i].createTableSpeed;
} }
float insertDataSpeed = 0; float insertDataSpeed = 0;
for (int i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
insertDataSpeed += pInfo[i].insertDataSpeed; insertDataSpeed += pInfo[i].insertDataSpeed;
} }
@ -137,8 +132,8 @@ void createDbAndStb() {
void *threadFunc(void *param) { void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo *pInfo = (SThreadInfo *)param;
char qstr[65000]; char *qstr = malloc(2000 * 1000);
int code; int32_t code = 0;
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
@ -153,7 +148,13 @@ void *threadFunc(void *param) {
if (createTable) { if (createTable) {
int64_t startMs = taosGetTimestampMs(); int64_t startMs = taosGetTimestampMs();
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
sprintf(qstr, "create table t%d using %s tags(%d)", t, stbName, t); int32_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i);
}
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
@ -189,6 +190,7 @@ void *threadFunc(void *param) {
} }
taos_close(con); taos_close(con);
free(qstr);
return 0; return 0;
} }
@ -218,8 +220,8 @@ void printHelp() {
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void parseArgument(int argc, char *argv[]) { void parseArgument(int32_t argc, char *argv[]) {
for (int i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
printHelp(); printHelp();
exit(0); exit(0);