TD-10431 process create vnode msg
This commit is contained in:
parent
6f0f7c5821
commit
7750309283
|
@ -231,7 +231,7 @@ do { \
|
|||
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
|
||||
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
|
||||
#define TSDB_CQ_SQL_SIZE 1024
|
||||
#define TSDB_MIN_VNODES 64
|
||||
#define TSDB_MIN_VNODES 16
|
||||
#define TSDB_MAX_VNODES 512
|
||||
#define TSDB_MIN_VNODES_PER_DB 1
|
||||
#define TSDB_MAX_VNODES_PER_DB 4096
|
||||
|
|
|
@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
|
|||
|
||||
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
SVnodeObj * pVnode = NULL;
|
||||
SVnodeObj *pVnode = NULL;
|
||||
int32_t refCount = 0;
|
||||
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
|
@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
|
|||
}
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
|
||||
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
|
||||
if (pVnode != NULL) {
|
||||
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
|
||||
}
|
||||
|
||||
return pVnode;
|
||||
}
|
||||
|
||||
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||
if (pVnode == NULL) return;
|
||||
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
int32_t refCount = 0;
|
||||
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
if (pVnode != NULL) {
|
||||
refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
}
|
||||
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
|
||||
if (pVnode != NULL) {
|
||||
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
|
||||
}
|
||||
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
|
||||
}
|
||||
|
||||
static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) {
|
||||
|
@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
|
|||
|
||||
pMgmt->totalVnodes = numOfVnodes;
|
||||
|
||||
int32_t threadNum = tsNumOfCores;
|
||||
int32_t threadNum = pDnode->opt.numOfCores;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
|
||||
|
@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) {
|
|||
|
||||
static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
|
||||
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||
*vgId = htonl(pCreate->vgId);
|
||||
pCreate->vgId = htonl(pCreate->vgId);
|
||||
pCreate->dnodeId = htonl(pCreate->dnodeId);
|
||||
pCreate->dbUid = htobe64(pCreate->dbUid);
|
||||
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
|
||||
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
|
||||
pCreate->daysPerFile = htonl(pCreate->daysPerFile);
|
||||
pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
|
||||
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
||||
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||
pCreate->minRows = htonl(pCreate->minRows);
|
||||
pCreate->maxRows = htonl(pCreate->maxRows);
|
||||
pCreate->commitTime = htonl(pCreate->commitTime);
|
||||
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||
for (int r = 0; r < pCreate->replica; ++r) {
|
||||
SReplica *pReplica = &pCreate->replicas[r];
|
||||
pReplica->id = htonl(pReplica->id);
|
||||
pReplica->port = htons(pReplica->port);
|
||||
}
|
||||
|
||||
*vgId = pCreate->vgId;
|
||||
|
||||
#if 0
|
||||
tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
|
||||
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
|
||||
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
|
||||
pCfg->daysPerFile = htonl(pCreate->daysPerFile);
|
||||
pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0);
|
||||
pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
||||
pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||
pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
|
||||
pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
|
||||
pCfg->precision = pCreate->precision;
|
||||
pCfg->compression = pCreate->compression;
|
||||
pCfg->cacheLastRow = pCreate->cacheLastRow;
|
||||
pCfg->update = pCreate->update;
|
||||
pCfg->quorum = pCreate->quorum;
|
||||
pCfg->replica = pCreate->replica;
|
||||
pCfg->walLevel = pCreate->walLevel;
|
||||
pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||
|
||||
for (int32_t i = 0; i < pCfg->replica; ++i) {
|
||||
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
|
||||
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
pCfg->wsize = pCreate->cacheBlockSize;
|
||||
pCfg->ssize = pCreate->cacheBlockSize;
|
||||
pCfg->wsize = pCreate->cacheBlockSize;
|
||||
pCfg->lsize = pCreate->cacheBlockSize;
|
||||
pCfg->isHeapAllocator = true;
|
||||
pCfg->ttl = 4;
|
||||
pCfg->keep = pCreate->daysToKeep0;
|
||||
pCfg->isWeak = true;
|
||||
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
|
||||
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
|
||||
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
|
||||
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
||||
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
|
||||
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
|
||||
pCfg->walCfg.level = pCreate->walLevel;
|
||||
pCfg->walCfg.retentionPeriod = 10;
|
||||
pCfg->walCfg.retentionSize = 128;
|
||||
pCfg->walCfg.rollPeriod = 128;
|
||||
pCfg->walCfg.segSize = 128;
|
||||
pCfg->walCfg.vgId = pCreate->vgId;
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
|
|||
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
|
||||
SMWorkerPool *pPool = &pMgmt->writePool;
|
||||
pPool->name = "vnode-write";
|
||||
pPool->max = tsNumOfCores;
|
||||
pPool->max = pDnode->opt.numOfCores;
|
||||
if (tMWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
|
||||
int32_t maxThreads = tsNumOfCores / 2;
|
||||
int32_t maxThreads = pDnode->opt.numOfCores / 2;
|
||||
if (maxThreads < 1) maxThreads = 1;
|
||||
|
||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
|
|
|
@ -15,6 +15,6 @@ add_subdirectory(stb)
|
|||
# add_subdirectory(telem)
|
||||
# add_subdirectory(trans)
|
||||
add_subdirectory(user)
|
||||
# add_subdirectory(vgroup)
|
||||
add_subdirectory(vgroup)
|
||||
|
||||
# add_subdirectory(common)
|
||||
|
|
|
@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
SRpcMsg* pMsg = pClient->pRsp;
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, 0);
|
||||
// taosMsleep(1000000);
|
||||
}
|
||||
|
||||
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
add_executable(dnode_test_vgroup "")
|
||||
|
||||
target_sources(dnode_test_vgroup
|
||||
PRIVATE
|
||||
"vgroup.cpp"
|
||||
"../sut/deploy.cpp"
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
dnode_test_vgroup
|
||||
PUBLIC dnode
|
||||
PUBLIC util
|
||||
PUBLIC os
|
||||
PUBLIC gtest_main
|
||||
)
|
||||
|
||||
target_include_directories(dnode_test_vgroup
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME dnode_test_vgroup
|
||||
COMMAND dnode_test_vgroup
|
||||
)
|
|
@ -0,0 +1,224 @@
|
|||
/**
|
||||
* @file db.cpp
|
||||
* @author slguan (slguan@taosdata.com)
|
||||
* @brief DNODE module vgroup-msg tests
|
||||
* @version 0.1
|
||||
* @date 2021-12-20
|
||||
*
|
||||
* @copyright Copyright (c) 2021
|
||||
*
|
||||
*/
|
||||
|
||||
#include "deploy.h"
|
||||
|
||||
class DndTestVgroup : public ::testing::Test {
|
||||
protected:
|
||||
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
|
||||
SServer* pServer = createServer(path, fqdn, port, firstEp);
|
||||
ASSERT(pServer);
|
||||
return pServer;
|
||||
}
|
||||
|
||||
static void SetUpTestSuite() {
|
||||
initLog("/tmp/tdlog");
|
||||
|
||||
const char* fqdn = "localhost";
|
||||
const char* firstEp = "localhost:9150";
|
||||
pServer = CreateServer("/tmp/dnode_test_vgroup", fqdn, 9150, firstEp);
|
||||
pClient = createClient("root", "taosdata", fqdn, 9150);
|
||||
taosMsleep(1100);
|
||||
}
|
||||
|
||||
static void TearDownTestSuite() {
|
||||
stopServer(pServer);
|
||||
dropClient(pClient);
|
||||
pServer = NULL;
|
||||
pClient = NULL;
|
||||
}
|
||||
|
||||
static SServer* pServer;
|
||||
static SClient* pClient;
|
||||
static int32_t connId;
|
||||
|
||||
public:
|
||||
void SetUp() override {}
|
||||
void TearDown() override {}
|
||||
|
||||
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
|
||||
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
|
||||
pShow->type = showType;
|
||||
if (db != NULL) {
|
||||
strcpy(pShow->db, db);
|
||||
}
|
||||
SRpcMsg showRpcMsg = {0};
|
||||
showRpcMsg.pCont = pShow;
|
||||
showRpcMsg.contLen = sizeof(SShowMsg);
|
||||
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
|
||||
|
||||
sendMsg(pClient, &showRpcMsg);
|
||||
ASSERT_NE(pClient->pRsp, nullptr);
|
||||
ASSERT_EQ(pClient->pRsp->code, 0);
|
||||
ASSERT_NE(pClient->pRsp->pCont, nullptr);
|
||||
|
||||
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
|
||||
ASSERT_NE(pShowRsp, nullptr);
|
||||
pShowRsp->showId = htonl(pShowRsp->showId);
|
||||
pMeta = &pShowRsp->tableMeta;
|
||||
pMeta->numOfTags = htonl(pMeta->numOfTags);
|
||||
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
|
||||
pMeta->sversion = htonl(pMeta->sversion);
|
||||
pMeta->tversion = htonl(pMeta->tversion);
|
||||
pMeta->tuid = htobe64(pMeta->tuid);
|
||||
pMeta->suid = htobe64(pMeta->suid);
|
||||
|
||||
showId = pShowRsp->showId;
|
||||
|
||||
EXPECT_NE(pShowRsp->showId, 0);
|
||||
EXPECT_STREQ(pMeta->tbFname, showName);
|
||||
EXPECT_EQ(pMeta->numOfTags, 0);
|
||||
EXPECT_EQ(pMeta->numOfColumns, columns);
|
||||
EXPECT_EQ(pMeta->precision, 0);
|
||||
EXPECT_EQ(pMeta->tableType, 0);
|
||||
EXPECT_EQ(pMeta->update, 0);
|
||||
EXPECT_EQ(pMeta->sversion, 0);
|
||||
EXPECT_EQ(pMeta->tversion, 0);
|
||||
EXPECT_EQ(pMeta->tuid, 0);
|
||||
EXPECT_EQ(pMeta->suid, 0);
|
||||
}
|
||||
|
||||
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
|
||||
SSchema* pSchema = &pMeta->pSchema[index];
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
EXPECT_EQ(pSchema->colId, 0);
|
||||
EXPECT_EQ(pSchema->type, type);
|
||||
EXPECT_EQ(pSchema->bytes, bytes);
|
||||
EXPECT_STREQ(pSchema->name, name);
|
||||
}
|
||||
|
||||
void SendThenCheckShowRetrieveMsg(int32_t rows) {
|
||||
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||
pRetrieve->showId = htonl(showId);
|
||||
pRetrieve->free = 0;
|
||||
|
||||
SRpcMsg retrieveRpcMsg = {0};
|
||||
retrieveRpcMsg.pCont = pRetrieve;
|
||||
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
|
||||
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
|
||||
|
||||
sendMsg(pClient, &retrieveRpcMsg);
|
||||
|
||||
ASSERT_NE(pClient->pRsp, nullptr);
|
||||
ASSERT_EQ(pClient->pRsp->code, 0);
|
||||
ASSERT_NE(pClient->pRsp->pCont, nullptr);
|
||||
|
||||
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
|
||||
ASSERT_NE(pRetrieveRsp, nullptr);
|
||||
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
||||
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
|
||||
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
|
||||
|
||||
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
|
||||
EXPECT_EQ(pRetrieveRsp->useconds, 0);
|
||||
// EXPECT_EQ(pRetrieveRsp->completed, completed);
|
||||
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
EXPECT_EQ(pRetrieveRsp->compressed, 0);
|
||||
EXPECT_EQ(pRetrieveRsp->compLen, 0);
|
||||
|
||||
pData = pRetrieveRsp->data;
|
||||
pos = 0;
|
||||
}
|
||||
|
||||
void CheckInt8(int8_t val) {
|
||||
int8_t data = *((int8_t*)(pData + pos));
|
||||
pos += sizeof(int8_t);
|
||||
EXPECT_EQ(data, val);
|
||||
}
|
||||
|
||||
void CheckInt16(int16_t val) {
|
||||
int16_t data = *((int16_t*)(pData + pos));
|
||||
pos += sizeof(int16_t);
|
||||
EXPECT_EQ(data, val);
|
||||
}
|
||||
|
||||
void CheckInt32(int32_t val) {
|
||||
int32_t data = *((int32_t*)(pData + pos));
|
||||
pos += sizeof(int32_t);
|
||||
EXPECT_EQ(data, val);
|
||||
}
|
||||
|
||||
void CheckInt64(int64_t val) {
|
||||
int64_t data = *((int64_t*)(pData + pos));
|
||||
pos += sizeof(int64_t);
|
||||
EXPECT_EQ(data, val);
|
||||
}
|
||||
|
||||
void CheckTimestamp() {
|
||||
int64_t data = *((int64_t*)(pData + pos));
|
||||
pos += sizeof(int64_t);
|
||||
EXPECT_GT(data, 0);
|
||||
}
|
||||
|
||||
void CheckBinary(const char* val, int32_t len) {
|
||||
pos += sizeof(VarDataLenT);
|
||||
char* data = (char*)(pData + pos);
|
||||
pos += len;
|
||||
EXPECT_STREQ(data, val);
|
||||
}
|
||||
|
||||
int32_t showId;
|
||||
STableMetaMsg* pMeta;
|
||||
SRetrieveTableRsp* pRetrieveRsp;
|
||||
char* pData;
|
||||
int32_t pos;
|
||||
};
|
||||
|
||||
SServer* DndTestVgroup::pServer;
|
||||
SClient* DndTestVgroup::pClient;
|
||||
int32_t DndTestVgroup::connId;
|
||||
|
||||
|
||||
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
|
||||
{
|
||||
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
|
||||
pReq->vgId = htonl(2);
|
||||
pReq->dnodeId = htonl(1);
|
||||
strcpy(pReq->db, "1.d1");
|
||||
pReq->dbUid = htobe64(9527);
|
||||
pReq->cacheBlockSize = htonl(16);
|
||||
pReq->totalBlocks = htonl(10);
|
||||
pReq->daysPerFile = htonl(10);
|
||||
pReq->daysToKeep0 = htonl(3650);
|
||||
pReq->daysToKeep1 = htonl(3650);
|
||||
pReq->daysToKeep2 = htonl(3650);
|
||||
pReq->minRows = htonl(100);
|
||||
pReq->minRows = htonl(4096);
|
||||
pReq->commitTime = htonl(3600);
|
||||
pReq->fsyncPeriod = htonl(3000);
|
||||
pReq->walLevel = 1;
|
||||
pReq->precision = 0;
|
||||
pReq->compression = 2;
|
||||
pReq->replica = 1;
|
||||
pReq->quorum = 1;
|
||||
pReq->update = 0;
|
||||
pReq->cacheLastRow = 0;
|
||||
pReq->selfIndex = 0;
|
||||
for (int r = 0; r < pReq->replica; ++r) {
|
||||
SReplica* pReplica = &pReq->replicas[r];
|
||||
pReplica->id = htonl(1);
|
||||
pReplica->port = htons(9150);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = pReq;
|
||||
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
|
||||
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
|
||||
|
||||
sendMsg(pClient, &rpcMsg);
|
||||
SRpcMsg* pMsg = pClient->pRsp;
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, 0);
|
||||
taosMsleep(1000000);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue