commit
6fb5be1282
|
@ -282,8 +282,7 @@ PRASE_DNODE_OVER:
|
||||||
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
||||||
pMgmt->dnodeEps->num = 1;
|
pMgmt->dnodeEps->num = 1;
|
||||||
pMgmt->dnodeEps->eps[0].isMnode = 1;
|
pMgmt->dnodeEps->eps[0].isMnode = 1;
|
||||||
pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort;
|
taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
|
||||||
tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dndResetDnodes(pDnode, pMgmt->dnodeEps);
|
dndResetDnodes(pDnode, pMgmt->dnodeEps);
|
||||||
|
@ -371,6 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
|
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
|
||||||
pMgmt->statusSent = 1;
|
pMgmt->statusSent = 1;
|
||||||
|
|
||||||
|
dTrace("pDnode:%p, send status msg to mnode", pDnode);
|
||||||
dndSendMsgToMnode(pDnode, &rpcMsg);
|
dndSendMsgToMnode(pDnode, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,7 +448,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a
|
||||||
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
|
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
|
||||||
|
|
||||||
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
dDebug("config msg is received");
|
dError("config msg is received, but not supported yet");
|
||||||
SCfgDnodeMsg *pCfg = pMsg->pCont;
|
SCfgDnodeMsg *pCfg = pMsg->pCont;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
|
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
@ -476,11 +477,11 @@ static void *dnodeThreadRoutine(void *param) {
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
pthread_testcancel();
|
pthread_testcancel();
|
||||||
|
taosMsleep(ms);
|
||||||
|
|
||||||
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent) {
|
if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent) {
|
||||||
dndSendStatusMsg(pDnode);
|
dndSendStatusMsg(pDnode);
|
||||||
}
|
}
|
||||||
taosMsleep(ms);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -584,4 +585,5 @@ void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
default:
|
default:
|
||||||
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
|
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
|
||||||
}
|
}
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
|
@ -559,9 +559,12 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
|
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -645,9 +648,12 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
|
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
|
||||||
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
|
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
|
||||||
|
if (pRpcMsg->msgType & 1u) {
|
||||||
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
|
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pRpcMsg->pCont);
|
rpcFreeCont(pRpcMsg->pCont);
|
||||||
|
pRpcMsg->pCont = NULL;
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -656,9 +662,12 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
|
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
|
@ -668,9 +677,12 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
|
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
|
@ -680,9 +692,12 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||||
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
|
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
|
||||||
|
if (pMsg->msgType & 1u) {
|
||||||
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
|
}
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndReleaseMnode(pDnode, pMnode);
|
dndReleaseMnode(pDnode, pMnode);
|
||||||
|
|
|
@ -141,8 +141,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
|
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
|
||||||
} else {
|
} else {
|
||||||
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
|
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
|
||||||
}
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dndInitClient(SDnode *pDnode) {
|
static int32_t dndInitClient(SDnode *pDnode) {
|
||||||
|
|
|
@ -196,7 +196,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
|
||||||
dndSetStat(pDnode, DND_STAT_RUNNING);
|
dndSetStat(pDnode, DND_STAT_RUNNING);
|
||||||
dndSendStatusMsg(pDnode);
|
dndSendStatusMsg(pDnode);
|
||||||
dndReportStartup(pDnode, "TDengine", "initialized successfully");
|
dndReportStartup(pDnode, "TDengine", "initialized successfully");
|
||||||
dInfo("TDengine is initialized successfully");
|
dInfo("TDengine is initialized successfully, pDnode:%p", pDnode);
|
||||||
|
|
||||||
return pDnode;
|
return pDnode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,15 +24,15 @@ class DndTestDnode : public ::testing::Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void SetUpTestSuite() {
|
static void SetUpTestSuite() {
|
||||||
initLog("/tmp/dndTestDnode1");
|
initLog("/tmp/dndTestDnode");
|
||||||
|
|
||||||
const char* fqdn = "localhost";
|
const char* fqdn = "localhost";
|
||||||
const char* firstEp = "localhost:9521";
|
const char* firstEp = "localhost:9521";
|
||||||
pServer1 = CreateServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
|
pServer1 = CreateServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
|
||||||
pServer2 = CreateServer("/tmp/dndTestDnode2", fqdn, 9522, firstEp);
|
pServer2 = CreateServer("/tmp/dndTestDnode2", fqdn, 9522, firstEp);
|
||||||
// pServer3 = CreateServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
|
pServer3 = CreateServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
|
||||||
// pServer4 = CreateServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
|
pServer4 = CreateServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
|
||||||
// pServer5 = CreateServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
|
pServer5 = CreateServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
|
||||||
pClient = createClient("root", "taosdata", fqdn, 9521);
|
pClient = createClient("root", "taosdata", fqdn, 9521);
|
||||||
taosMsleep(300);
|
taosMsleep(300);
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,6 @@ class DndTestDnode : public ::testing::Test {
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
|
|
||||||
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
|
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
|
||||||
//--- meta ---
|
|
||||||
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
|
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
|
||||||
pShow->type = showType;
|
pShow->type = showType;
|
||||||
strcpy(pShow->db, "");
|
strcpy(pShow->db, "");
|
||||||
|
@ -108,7 +107,7 @@ class DndTestDnode : public ::testing::Test {
|
||||||
EXPECT_STREQ(pSchema->name, name);
|
EXPECT_STREQ(pSchema->name, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SendThenCheckShowRetrieveMsg(int32_t rows) {
|
void SendThenCheckShowRetrieveMsg(int32_t rows, int32_t completed) {
|
||||||
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
|
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||||
pRetrieve->showId = htonl(showId);
|
pRetrieve->showId = htonl(showId);
|
||||||
pRetrieve->free = 0;
|
pRetrieve->free = 0;
|
||||||
|
@ -134,7 +133,7 @@ class DndTestDnode : public ::testing::Test {
|
||||||
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
|
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
|
||||||
EXPECT_EQ(pRetrieveRsp->offset, 0);
|
EXPECT_EQ(pRetrieveRsp->offset, 0);
|
||||||
EXPECT_EQ(pRetrieveRsp->useconds, 0);
|
EXPECT_EQ(pRetrieveRsp->useconds, 0);
|
||||||
EXPECT_EQ(pRetrieveRsp->completed, 1);
|
EXPECT_EQ(pRetrieveRsp->completed, completed);
|
||||||
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||||
EXPECT_EQ(pRetrieveRsp->compressed, 0);
|
EXPECT_EQ(pRetrieveRsp->compressed, 0);
|
||||||
EXPECT_EQ(pRetrieveRsp->reserved, 0);
|
EXPECT_EQ(pRetrieveRsp->reserved, 0);
|
||||||
|
@ -193,7 +192,7 @@ TEST_F(DndTestDnode, ShowDnode) {
|
||||||
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
|
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
|
||||||
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason");
|
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason");
|
||||||
|
|
||||||
SendThenCheckShowRetrieveMsg(1);
|
SendThenCheckShowRetrieveMsg(1, 1);
|
||||||
CheckInt16(1);
|
CheckInt16(1);
|
||||||
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
||||||
CheckInt16(0);
|
CheckInt16(0);
|
||||||
|
@ -203,6 +202,22 @@ TEST_F(DndTestDnode, ShowDnode) {
|
||||||
CheckBinary("", 24);
|
CheckBinary("", 24);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DndTestDnode, ConfigDnode_01) {
|
||||||
|
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(sizeof(SCfgDnodeMsg));
|
||||||
|
pReq->dnodeId = htonl(1);
|
||||||
|
strcpy(pReq->config, "ddebugflag 131");
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = sizeof(SCfgDnodeMsg);
|
||||||
|
rpcMsg.msgType = TSDB_MSG_TYPE_CONFIG_DNODE;
|
||||||
|
|
||||||
|
sendMsg(pClient, &rpcMsg);
|
||||||
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DndTestDnode, CreateDnode_01) {
|
TEST_F(DndTestDnode, CreateDnode_01) {
|
||||||
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
|
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
|
||||||
strcpy(pReq->ep, "localhost:9522");
|
strcpy(pReq->ep, "localhost:9522");
|
||||||
|
@ -219,7 +234,7 @@ TEST_F(DndTestDnode, CreateDnode_01) {
|
||||||
|
|
||||||
taosMsleep(1300);
|
taosMsleep(1300);
|
||||||
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
|
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
|
||||||
SendThenCheckShowRetrieveMsg(2);
|
SendThenCheckShowRetrieveMsg(2, 1);
|
||||||
CheckInt16(1);
|
CheckInt16(1);
|
||||||
CheckInt16(2);
|
CheckInt16(2);
|
||||||
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
||||||
|
@ -236,135 +251,107 @@ TEST_F(DndTestDnode, CreateDnode_01) {
|
||||||
CheckBinary("", 24);
|
CheckBinary("", 24);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
TEST_F(DndTestDnode, DropDnode_01) {
|
||||||
TEST_F(DndTestDnode, AlterUser_01) {
|
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg));
|
||||||
ASSERT_NE(pClient, nullptr);
|
pReq->dnodeId = htonl(2);
|
||||||
|
|
||||||
//--- drop user ---
|
|
||||||
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
|
|
||||||
strcpy(pReq->user, "u1");
|
|
||||||
strcpy(pReq->pass, "p2");
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = sizeof(SAlterUserMsg);
|
rpcMsg.contLen = sizeof(SDropDnodeMsg);
|
||||||
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
|
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DNODE;
|
||||||
|
|
||||||
sendMsg(pClient, &rpcMsg);
|
sendMsg(pClient, &rpcMsg);
|
||||||
SRpcMsg* pMsg = pClient->pRsp;
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
ASSERT_EQ(pMsg->code, 0);
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
|
||||||
//--- meta ---
|
taosMsleep(1300);
|
||||||
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
|
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
|
||||||
pShow->type = TSDB_MGMT_TABLE_USER;
|
SendThenCheckShowRetrieveMsg(1, 0);
|
||||||
SRpcMsg showRpcMsg = {0};
|
CheckInt16(1);
|
||||||
showRpcMsg.pCont = pShow;
|
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
||||||
showRpcMsg.contLen = sizeof(SShowMsg);
|
CheckInt16(0);
|
||||||
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
|
CheckInt16(1);
|
||||||
|
CheckBinary("ready", 10);
|
||||||
sendMsg(pClient, &showRpcMsg);
|
CheckTimestamp();
|
||||||
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
|
CheckBinary("", 24);
|
||||||
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
|
|
||||||
pMeta->numOfColumns = htons(pMeta->numOfColumns);
|
|
||||||
EXPECT_EQ(pMeta->numOfColumns, 4);
|
|
||||||
|
|
||||||
//--- retrieve ---
|
|
||||||
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
|
|
||||||
pRetrieve->showId = pShowRsp->showId;
|
|
||||||
SRpcMsg retrieveRpcMsg = {0};
|
|
||||||
retrieveRpcMsg.pCont = pRetrieve;
|
|
||||||
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
|
|
||||||
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
|
||||||
|
|
||||||
sendMsg(pClient, &retrieveRpcMsg);
|
|
||||||
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
|
|
||||||
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
|
||||||
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
|
|
||||||
|
|
||||||
char* pData = pRetrieveRsp->data;
|
|
||||||
int32_t pos = 0;
|
|
||||||
char* strVal = NULL;
|
|
||||||
|
|
||||||
//--- name ---
|
|
||||||
{
|
|
||||||
pos += sizeof(VarDataLenT);
|
|
||||||
strVal = (char*)(pData + pos);
|
|
||||||
pos += TSDB_USER_LEN;
|
|
||||||
EXPECT_STREQ(strVal, "u1");
|
|
||||||
|
|
||||||
pos += sizeof(VarDataLenT);
|
|
||||||
strVal = (char*)(pData + pos);
|
|
||||||
pos += TSDB_USER_LEN;
|
|
||||||
EXPECT_STREQ(strVal, "root");
|
|
||||||
|
|
||||||
pos += sizeof(VarDataLenT);
|
|
||||||
strVal = (char*)(pData + pos);
|
|
||||||
pos += TSDB_USER_LEN;
|
|
||||||
EXPECT_STREQ(strVal, "_root");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestDnode, DropUser_01) {
|
TEST_F(DndTestDnode, CreateDnode_02) {
|
||||||
ASSERT_NE(pClient, nullptr);
|
{
|
||||||
|
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
|
||||||
//--- drop user ---
|
strcpy(pReq->ep, "localhost:9523");
|
||||||
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
|
|
||||||
strcpy(pReq->user, "u1");
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.pCont = pReq;
|
rpcMsg.pCont = pReq;
|
||||||
rpcMsg.contLen = sizeof(SDropUserMsg);
|
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
|
||||||
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER;
|
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
|
||||||
|
|
||||||
sendMsg(pClient, &rpcMsg);
|
sendMsg(pClient, &rpcMsg);
|
||||||
SRpcMsg* pMsg = pClient->pRsp;
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
ASSERT_EQ(pMsg->code, 0);
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
|
||||||
//--- meta ---
|
|
||||||
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
|
|
||||||
pShow->type = TSDB_MGMT_TABLE_USER;
|
|
||||||
SRpcMsg showRpcMsg = {0};
|
|
||||||
showRpcMsg.pCont = pShow;
|
|
||||||
showRpcMsg.contLen = sizeof(SShowMsg);
|
|
||||||
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
|
|
||||||
|
|
||||||
sendMsg(pClient, &showRpcMsg);
|
|
||||||
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
|
|
||||||
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
|
|
||||||
pMeta->numOfColumns = htons(pMeta->numOfColumns);
|
|
||||||
EXPECT_EQ(pMeta->numOfColumns, 4);
|
|
||||||
|
|
||||||
//--- retrieve ---
|
|
||||||
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
|
|
||||||
pRetrieve->showId = pShowRsp->showId;
|
|
||||||
SRpcMsg retrieveRpcMsg = {0};
|
|
||||||
retrieveRpcMsg.pCont = pRetrieve;
|
|
||||||
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
|
|
||||||
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
|
||||||
|
|
||||||
sendMsg(pClient, &retrieveRpcMsg);
|
|
||||||
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
|
|
||||||
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
|
||||||
EXPECT_EQ(pRetrieveRsp->numOfRows, 2);
|
|
||||||
|
|
||||||
char* pData = pRetrieveRsp->data;
|
|
||||||
int32_t pos = 0;
|
|
||||||
char* strVal = NULL;
|
|
||||||
|
|
||||||
//--- name ---
|
|
||||||
{
|
|
||||||
pos += sizeof(VarDataLenT);
|
|
||||||
strVal = (char*)(pData + pos);
|
|
||||||
pos += TSDB_USER_LEN;
|
|
||||||
EXPECT_STREQ(strVal, "root");
|
|
||||||
|
|
||||||
pos += sizeof(VarDataLenT);
|
|
||||||
strVal = (char*)(pData + pos);
|
|
||||||
pos += TSDB_USER_LEN;
|
|
||||||
EXPECT_STREQ(strVal, "_root");
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
{
|
||||||
|
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
|
||||||
|
strcpy(pReq->ep, "localhost:9524");
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
|
||||||
|
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
|
||||||
|
|
||||||
|
sendMsg(pClient, &rpcMsg);
|
||||||
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
|
||||||
|
strcpy(pReq->ep, "localhost:9525");
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pReq;
|
||||||
|
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
|
||||||
|
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
|
||||||
|
|
||||||
|
sendMsg(pClient, &rpcMsg);
|
||||||
|
SRpcMsg* pMsg = pClient->pRsp;
|
||||||
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMsleep(1300);
|
||||||
|
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
|
||||||
|
SendThenCheckShowRetrieveMsg(4, 0);
|
||||||
|
CheckInt16(1);
|
||||||
|
CheckInt16(3);
|
||||||
|
CheckInt16(4);
|
||||||
|
CheckInt16(5);
|
||||||
|
CheckBinary("localhost:9521", TSDB_EP_LEN);
|
||||||
|
CheckBinary("localhost:9523", TSDB_EP_LEN);
|
||||||
|
CheckBinary("localhost:9524", TSDB_EP_LEN);
|
||||||
|
CheckBinary("localhost:9525", TSDB_EP_LEN);
|
||||||
|
CheckInt16(0);
|
||||||
|
CheckInt16(0);
|
||||||
|
CheckInt16(0);
|
||||||
|
CheckInt16(0);
|
||||||
|
CheckInt16(1);
|
||||||
|
CheckInt16(1);
|
||||||
|
CheckInt16(1);
|
||||||
|
CheckInt16(1);
|
||||||
|
CheckBinary("ready", 10);
|
||||||
|
CheckBinary("ready", 10);
|
||||||
|
CheckBinary("ready", 10);
|
||||||
|
CheckBinary("ready", 10);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckBinary("", 24);
|
||||||
|
CheckBinary("", 24);
|
||||||
|
CheckBinary("", 24);
|
||||||
|
CheckBinary("", 24);
|
||||||
|
}
|
||||||
|
|
|
@ -16,9 +16,9 @@
|
||||||
#include "deploy.h"
|
#include "deploy.h"
|
||||||
|
|
||||||
void initLog(const char* path) {
|
void initLog(const char* path) {
|
||||||
dDebugFlag = 207;
|
dDebugFlag = 143;
|
||||||
vDebugFlag = 0;
|
vDebugFlag = 0;
|
||||||
mDebugFlag = 207;
|
mDebugFlag = 143;
|
||||||
cDebugFlag = 0;
|
cDebugFlag = 0;
|
||||||
jniDebugFlag = 0;
|
jniDebugFlag = 0;
|
||||||
tmrDebugFlag = 0;
|
tmrDebugFlag = 0;
|
||||||
|
@ -35,6 +35,8 @@ void initLog(const char* path) {
|
||||||
tsdbDebugFlag = 0;
|
tsdbDebugFlag = 0;
|
||||||
cqDebugFlag = 0;
|
cqDebugFlag = 0;
|
||||||
|
|
||||||
|
taosMkDir(path);
|
||||||
|
|
||||||
char temp[PATH_MAX];
|
char temp[PATH_MAX];
|
||||||
snprintf(temp, PATH_MAX, "%s/taosdlog", path);
|
snprintf(temp, PATH_MAX, "%s/taosdlog", path);
|
||||||
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
|
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
|
||||||
|
|
|
@ -45,5 +45,3 @@ void dropServer(SServer* pServer);
|
||||||
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
|
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
|
||||||
void dropClient(SClient* pClient);
|
void dropClient(SClient* pClient);
|
||||||
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
|
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
|
||||||
|
|
||||||
// class DndTest
|
|
|
@ -16,6 +16,8 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
||||||
|
#define SHOW_STEP_SIZE 100
|
||||||
|
|
||||||
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
|
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
|
||||||
static void mndFreeShowObj(SShowObj *pShow);
|
static void mndFreeShowObj(SShowObj *pShow);
|
||||||
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
|
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
|
||||||
|
@ -211,7 +213,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* return no more than 100 tables in one round trip */
|
/* return no more than 100 tables in one round trip */
|
||||||
if (rowsToRead > 100) rowsToRead = 100;
|
if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
|
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
|
||||||
|
@ -220,7 +222,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
|
||||||
if (rowsToRead < 0) rowsToRead = 0;
|
if (rowsToRead < 0) rowsToRead = 0;
|
||||||
size = pShow->rowSize * rowsToRead;
|
size = pShow->rowSize * rowsToRead;
|
||||||
|
|
||||||
size += 100;
|
size += SHOW_STEP_SIZE;
|
||||||
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
|
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseShowObj(pShow, false);
|
mndReleaseShowObj(pShow, false);
|
||||||
|
|
|
@ -336,8 +336,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcConnInfo connInfo = {0};
|
SRpcConnInfo connInfo = {0};
|
||||||
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
|
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
|
||||||
mndCleanupMsg(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
|
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -354,6 +354,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
||||||
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
|
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
|
||||||
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
pMsg->rpcMsg.pCont = NULL;
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -367,7 +369,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||||
bool isReq = (msgType % 2 == 1);
|
bool isReq = (msgType & 1U);
|
||||||
|
|
||||||
mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);
|
mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue