diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a88cecf91..95cd82c9fd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -265,19 +265,7 @@ typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t suid; - int32_t sverson; - uint32_t ttl; - uint32_t keep; - int32_t numOfTags; - int32_t numOfColumns; - SSchema pSchema[]; -} SCreateStbInternalMsg; - -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - uint64_t suid; -} SDropStbInternalMsg; +} SVDropStbReq; typedef struct { SMsgHead head; @@ -855,17 +843,13 @@ typedef struct { typedef struct { int32_t dnodeId; -} SCreateMnodeMsg, SDropMnodeMsg; +} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg; typedef struct { int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; -} SCreateMnodeInMsg, SAlterMnodeInMsg; - -typedef struct { - int32_t dnodeId; -} SDropMnodeInMsg; +} SDCreateMnodeMsg, SDAlterMnodeMsg; typedef struct { int32_t dnodeId; @@ -1208,13 +1192,13 @@ typedef struct { char* executor; int32_t sqlLen; char* sql; -} SCreateTopicInternalMsg; +} SDCreateTopicMsg; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t tuid; -} SDropTopicInternalMsg; +} SDDropTopicMsg; typedef struct SVCreateTbReq { uint64_t ver; // use a general definition diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 8fb95c0b75..577cb0c3b0 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -299,7 +299,7 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) { +static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); @@ -417,8 +417,8 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = pRpcMsg->pCont; +static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { + SDCreateMnodeMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); for (int32_t i = 0; i < pMsg->replica; ++i) { pMsg->replicas[i].id = htonl(pMsg->replicas[i].id); @@ -429,7 +429,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { } int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); + SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -445,7 +445,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); + SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -465,7 +465,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDropMnodeInMsg *pMsg = pRpcMsg->pCont; + SDDropMnodeMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp index a6cec93539..e9b1ef45bd 100644 --- a/source/dnode/mgmt/impl/test/mnode/mnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -72,9 +72,9 @@ TEST_F(DndTestMnode, 01_ShowDnode) { TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(1); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -85,9 +85,9 @@ TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -117,9 +117,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { { // create mnode - int32_t contLen = sizeof(SCreateMnodeMsg); + int32_t contLen = sizeof(SMCreateMnodeMsg); - SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -144,9 +144,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { { // drop mnode - int32_t contLen = sizeof(SDropMnodeMsg); + int32_t contLen = sizeof(SMDropMnodeMsg); - SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen); + SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_MNODE, pReq, contLen); diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index ca168fb6d7..dca0f48516 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -146,8 +146,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { pSchema->bytes = htonl(pSchema->bytes); } - EXPECT_STREQ(pRsp->tbFname, ""); - EXPECT_STREQ(pRsp->stbFname, "1.d1.stb"); + EXPECT_STREQ(pRsp->tbFname, "1.d1.stb"); + EXPECT_STREQ(pRsp->stbFname, ""); EXPECT_EQ(pRsp->numOfColumns, 2); EXPECT_EQ(pRsp->numOfTags, 3); EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); diff --git a/source/dnode/mgmt/impl/test/sut/src/server.cpp b/source/dnode/mgmt/impl/test/sut/src/server.cpp index f29b2fad1d..8ac5f62144 100644 --- a/source/dnode/mgmt/impl/test/sut/src/server.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/server.cpp @@ -26,7 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p SDnodeOpt option = {0}; option.sver = 1; option.numOfCores = 1; - option.numOfSupportVnodes = 1; + option.numOfSupportVnodes = 16; option.numOfCommitThreads = 1; option.statusInterval = 1; option.numOfThreadsPerCore = 1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 04e77c0136..b76d72d79c 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -251,7 +251,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno void *pIter = NULL; int32_t numOfReplicas = 0; - SCreateMnodeInMsg createMsg = {0}; + SDCreateMnodeMsg createMsg = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -281,18 +281,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; - SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.pCont = pMsg; - action.contLen = sizeof(SAlterMnodeInMsg); + action.contLen = sizeof(SDAlterMnodeMsg); action.msgType = TDMT_DND_ALTER_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -309,14 +309,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg)); + SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg)); if (pMsg == NULL) return -1; - memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pObj->id); action.epSet = mndGetDnodeEpset(pDnode); action.pCont = pMsg; - action.contLen = sizeof(SCreateMnodeInMsg); + action.contLen = sizeof(SDCreateMnodeMsg); action.msgType = TDMT_DND_CREATE_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -327,7 +327,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) { SMnodeObj mnodeObj = {0}; mnodeObj.id = pDnode->id; mnodeObj.createdTime = taosGetTimestampMs(); @@ -370,7 +370,7 @@ CREATE_MNODE_OVER: static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; + SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; pCreate->dnodeId = htonl(pCreate->dnodeId); @@ -423,7 +423,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode void *pIter = NULL; int32_t numOfReplicas = 0; - SAlterMnodeInMsg alterMsg = {0}; + SDAlterMnodeMsg alterMsg = {0}; while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -449,18 +449,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode if (pMObj->id != pObj->id) { STransAction action = {0}; - SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg)); + SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg)); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pMObj); return -1; } - memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg)); + memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg)); pMsg->dnodeId = htonl(pMObj->id); action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.pCont = pMsg; - action.contLen = sizeof(SAlterMnodeInMsg); + action.contLen = sizeof(SDAlterMnodeMsg); action.msgType = TDMT_DND_ALTER_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -478,7 +478,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); - SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg)); + SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -487,7 +487,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode action.epSet = mndGetDnodeEpset(pDnode); action.pCont = pMsg; - action.contLen = sizeof(SDropMnodeInMsg); + action.contLen = sizeof(SDDropMnodeMsg); action.msgType = TDMT_DND_DROP_MNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -537,7 +537,7 @@ DROP_MNODE_OVER: static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; + SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; pDrop->dnodeId = htonl(pDrop->dnodeId); mDebug("mnode:%d, start to drop", pDrop->dnodeId); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index af18c814ca..918f43f2bd 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -201,7 +201,6 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { } static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) { -#if 1 SVCreateTbReq req; void * buf; int bsize; @@ -235,43 +234,12 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb *pContLen = sizeof(SMsgHead) + bsize; return buf; - -#else - int32_t totalCols = pStb->numOfTags + pStb->numOfColumns; - int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg); - - SCreateStbInternalMsg *pCreate = calloc(1, contLen); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->head.contLen = htonl(contLen); - pCreate->head.vgId = htonl(pVgroup->vgId); - memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN); - pCreate->suid = htobe64(pStb->uid); - pCreate->sverson = htonl(pStb->version); - pCreate->ttl = 0; - pCreate->keep = 0; - pCreate->numOfTags = htonl(pStb->numOfTags); - pCreate->numOfColumns = htonl(pStb->numOfColumns); - - memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema)); - for (int32_t t = 0; t < totalCols; ++t) { - SSchema *pSchema = &pCreate->pSchema[t]; - pSchema->bytes = htonl(pSchema->bytes); - pSchema->colId = htonl(pSchema->colId); - } - - *pContLen = contLen; - return pCreate; -#endif } -static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { - int32_t contLen = sizeof(SDropStbInternalMsg); +static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { + int32_t contLen = sizeof(SVDropStbReq); - SDropStbInternalMsg *pDrop = calloc(1, contLen); + SVDropStbReq *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -402,7 +370,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); + SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -413,7 +381,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pMsg; - action.contLen = sizeof(SDropStbInternalMsg); + action.contLen = sizeof(SVDropStbReq); action.msgType = TDMT_VND_DROP_STB; if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 49c96967e6..24e32e07b4 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -162,10 +162,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { - int32_t contLen = sizeof(SDropTopicInternalMsg); +static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { + int32_t contLen = sizeof(SDDropTopicMsg); - SDropTopicInternalMsg *pDrop = calloc(1, contLen); + SDDropTopicMsg *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 78205d5f6a..23b62d2dfb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -120,6 +120,9 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId) + if (pVgroup->replica == 1) { + pVgid->role = TAOS_SYNC_STATE_LEADER; + } } SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)