TD-10431 testing for create/drop vnode msg

This commit is contained in:
Shengliang Guan 2021-12-20 10:05:10 +08:00
parent 6c1e5acd82
commit aef124af3e
3 changed files with 150 additions and 65 deletions

View File

@ -87,8 +87,8 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
static int32_t dndWriteVnodesToFile(SDnode *pDnode);
@ -136,7 +136,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
@ -189,7 +189,7 @@ static int32_t dndCreateVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl)
return code;
}
static void dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosWLockLatch(&pMgmt->latch);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
@ -297,26 +297,26 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
SWrapperCfg *pCfg = &pCfgs[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
if (!vgId || vgId->type != cJSON_String) {
if (!vgId || vgId->type != cJSON_Number) {
dError("failed to read %s since vgId not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->vgId = atoi(vgId->valuestring);
pCfg->vgId = vgId->valueint;
snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
if (!dropped || dropped->type != cJSON_String) {
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->dropped = atoi(dropped->valuestring);
pCfg->dropped = dropped->valueint;
cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
if (!vgVersion || vgVersion->type != cJSON_String) {
if (!vgVersion || vgVersion->type != cJSON_Number) {
dError("failed to read %s since vgVersion not found", file);
goto PRASE_VNODE_OVER;
}
pCfg->vgVersion = atoi(vgVersion->valuestring);
pCfg->vgVersion = vgVersion->valueint;
cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
if (!dbUid || dbUid->type != cJSON_String) {
@ -358,28 +358,30 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
int32_t len = 0;
int32_t maxLen = 65536;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": \"%d\",\n", pVnode->vgVersion);
len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db);
len += snprintf(content + len, maxLen - len, " {\n");
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion);
len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
len += snprintf(content + len, maxLen - len, " },\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
len += snprintf(content + len, maxLen - len, " }\n");
}
}
len += snprintf(content + len, maxLen - len, " ]\n");
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
@ -422,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dndCreateVnode(pDnode, pCfg, pImpl);
dndOpenVnode(pDnode, pCfg, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
}
@ -507,7 +509,8 @@ static void dndCloseVnodes(SDnode *pDnode) {
SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) {
dndDropVnode(pDnode, pVnodes[i]);
dndReleaseVnode(pDnode, pVnodes[i]);
dndCloseVnode(pDnode, pVnodes[i]);
}
if (pVnodes != NULL) {
@ -613,7 +616,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return -1;
}
int32_t code = dndCreateVnode(pDnode, &wrapperCfg, pImpl);
int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
@ -639,20 +642,36 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
dndGenerateVnodeCfg(pAlter, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
return terrno;
}
if (wrapperCfg.vgVersion == pVnode->vgVersion) {
dndReleaseVnode(pDnode, pVnode);
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
return 0;
}
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
dndReleaseVnode(pDnode, pVnode);
return terrno;
}
int32_t oldVersion = pVnode->vgVersion;
pVnode->vgVersion = wrapperCfg.vgVersion;
int32_t code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
pVnode->vgVersion = oldVersion;
}
dndReleaseVnode(pDnode, pVnode);
return 0;
return code;
}
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
@ -673,7 +692,8 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return terrno;
}
dndDropVnode(pDnode, pVnode);
dndReleaseVnode(pDnode, pVnode);
dndCloseVnode(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);

View File

@ -16,7 +16,7 @@
#include "deploy.h"
void initLog(const char* path) {
dDebugFlag = 143;
dDebugFlag = 207;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;

View File

@ -178,45 +178,110 @@ int32_t DndTestVgroup::connId;
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
for (int i = 0; i < 3; ++i) {
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(1);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
{
for (int i = 0; i < 3; ++i) {
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(sizeof(SAlterVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
pReq->vgVersion = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRows = htonl(100);
pReq->minRows = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replica = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->selfIndex = 0;
for (int r = 0; r < pReq->replica; ++r) {
SReplica* pReplica = &pReq->replicas[r];
pReplica->id = htonl(1);
pReplica->port = htons(9150);
}
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
// taosMsleep(1000000);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
{
for (int i = 0; i < 3; ++i) {
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(sizeof(SDropVnodeMsg));
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
}
}