commit
d3500665b6
|
@ -265,19 +265,7 @@ typedef struct {
|
|||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
int32_t sverson;
|
||||
uint32_t ttl;
|
||||
uint32_t keep;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
SSchema pSchema[];
|
||||
} SCreateStbInternalMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
} SDropStbInternalMsg;
|
||||
} SVDropStbReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
|
@ -855,17 +843,13 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
} SCreateMnodeMsg, SDropMnodeMsg;
|
||||
} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int8_t replica;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
} SCreateMnodeInMsg, SAlterMnodeInMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
} SDropMnodeInMsg;
|
||||
} SDCreateMnodeMsg, SDAlterMnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
|
@ -1208,13 +1192,13 @@ typedef struct {
|
|||
char* executor;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
} SCreateTopicInternalMsg;
|
||||
} SDCreateTopicMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
uint64_t tuid;
|
||||
} SDropTopicInternalMsg;
|
||||
} SDDropTopicMsg;
|
||||
|
||||
typedef struct SVCreateTbReq {
|
||||
uint64_t ver; // use a general definition
|
||||
|
|
|
@ -299,7 +299,7 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
|
||||
}
|
||||
|
||||
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) {
|
||||
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) {
|
||||
dndInitMnodeOption(pDnode, pOption);
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
|
@ -417,8 +417,8 @@ static int32_t dndDropMnode(SDnode *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
|
||||
SCreateMnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||
static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
|
||||
SDCreateMnodeMsg *pMsg = pRpcMsg->pCont;
|
||||
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||
for (int32_t i = 0; i < pMsg->replica; ++i) {
|
||||
pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
|
||||
|
@ -429,7 +429,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
|
||||
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
|
||||
SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
|
||||
|
||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
|
||||
|
@ -445,7 +445,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
|
||||
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
|
||||
SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
|
||||
|
||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
|
||||
|
@ -465,7 +465,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
|
||||
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||
SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
|
||||
SDDropMnodeMsg *pMsg = pRpcMsg->pCont;
|
||||
pMsg->dnodeId = htonl(pMsg->dnodeId);
|
||||
|
||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||
|
|
|
@ -383,6 +383,7 @@ void dndCleanupTrans(SDnode *pDnode) {
|
|||
|
||||
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||
if (pMgmt->clientRpc == NULL) return;
|
||||
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ TEST_F(DndTestDnode, 01_ShowDnode) {
|
|||
CheckInt16(1);
|
||||
CheckBinary("localhost:9041", TSDB_EP_LEN);
|
||||
CheckInt16(0);
|
||||
CheckInt16(1);
|
||||
CheckInt16(16);
|
||||
CheckBinary("ready", 10);
|
||||
CheckTimestamp();
|
||||
CheckBinary("", 24);
|
||||
|
@ -112,8 +112,8 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
|
|||
CheckBinary("localhost:9042", TSDB_EP_LEN);
|
||||
CheckInt16(0);
|
||||
CheckInt16(0);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckBinary("ready", 10);
|
||||
CheckBinary("ready", 10);
|
||||
CheckTimestamp();
|
||||
|
@ -140,7 +140,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
|
|||
CheckInt16(1);
|
||||
CheckBinary("localhost:9041", TSDB_EP_LEN);
|
||||
CheckInt16(0);
|
||||
CheckInt16(1);
|
||||
CheckInt16(16);
|
||||
CheckBinary("ready", 10);
|
||||
CheckTimestamp();
|
||||
CheckBinary("", 24);
|
||||
|
@ -199,10 +199,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
|
|||
CheckInt16(0);
|
||||
CheckInt16(0);
|
||||
CheckInt16(0);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckBinary("ready", 10);
|
||||
CheckBinary("ready", 10);
|
||||
CheckBinary("ready", 10);
|
||||
|
@ -242,10 +242,10 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
|
|||
CheckInt16(0);
|
||||
CheckInt16(0);
|
||||
CheckInt16(0);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(1);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckInt16(16);
|
||||
CheckBinary("ready", 10);
|
||||
CheckBinary("ready", 10);
|
||||
CheckBinary("ready", 10);
|
||||
|
|
|
@ -72,9 +72,9 @@ TEST_F(DndTestMnode, 01_ShowDnode) {
|
|||
|
||||
TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
|
||||
{
|
||||
int32_t contLen = sizeof(SCreateMnodeMsg);
|
||||
int32_t contLen = sizeof(SMCreateMnodeMsg);
|
||||
|
||||
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(1);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
|
||||
|
@ -85,9 +85,9 @@ TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
|
|||
|
||||
TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) {
|
||||
{
|
||||
int32_t contLen = sizeof(SCreateMnodeMsg);
|
||||
int32_t contLen = sizeof(SMCreateMnodeMsg);
|
||||
|
||||
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
|
||||
|
@ -117,9 +117,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
|
|||
|
||||
{
|
||||
// create mnode
|
||||
int32_t contLen = sizeof(SCreateMnodeMsg);
|
||||
int32_t contLen = sizeof(SMCreateMnodeMsg);
|
||||
|
||||
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_MNODE, pReq, contLen);
|
||||
|
@ -144,9 +144,9 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
|
|||
|
||||
{
|
||||
// drop mnode
|
||||
int32_t contLen = sizeof(SDropMnodeMsg);
|
||||
int32_t contLen = sizeof(SMDropMnodeMsg);
|
||||
|
||||
SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen);
|
||||
SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_MNODE, pReq, contLen);
|
||||
|
|
|
@ -146,8 +146,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
}
|
||||
|
||||
EXPECT_STREQ(pRsp->tbFname, "");
|
||||
EXPECT_STREQ(pRsp->stbFname, "1.d1.stb");
|
||||
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb");
|
||||
EXPECT_STREQ(pRsp->stbFname, "");
|
||||
EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||
EXPECT_EQ(pRsp->numOfTags, 3);
|
||||
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
|
|
@ -26,7 +26,7 @@ SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t p
|
|||
SDnodeOpt option = {0};
|
||||
option.sver = 1;
|
||||
option.numOfCores = 1;
|
||||
option.numOfSupportVnodes = 1;
|
||||
option.numOfSupportVnodes = 16;
|
||||
option.numOfCommitThreads = 1;
|
||||
option.statusInterval = 1;
|
||||
option.numOfThreadsPerCore = 1;
|
||||
|
|
|
@ -251,7 +251,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
void *pIter = NULL;
|
||||
int32_t numOfReplicas = 0;
|
||||
|
||||
SCreateMnodeInMsg createMsg = {0};
|
||||
SDCreateMnodeMsg createMsg = {0};
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
|
@ -281,18 +281,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
|
||||
STransAction action = {0};
|
||||
|
||||
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
|
||||
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
|
||||
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
|
||||
|
||||
pMsg->dnodeId = htonl(pMObj->id);
|
||||
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SAlterMnodeInMsg);
|
||||
action.contLen = sizeof(SDAlterMnodeMsg);
|
||||
action.msgType = TDMT_DND_ALTER_MNODE;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
|
@ -309,14 +309,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
|
||||
SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg));
|
||||
SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg));
|
||||
if (pMsg == NULL) return -1;
|
||||
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
|
||||
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
|
||||
pMsg->dnodeId = htonl(pObj->id);
|
||||
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SCreateMnodeInMsg);
|
||||
action.contLen = sizeof(SDCreateMnodeMsg);
|
||||
action.msgType = TDMT_DND_CREATE_MNODE;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -327,7 +327,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) {
|
||||
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) {
|
||||
SMnodeObj mnodeObj = {0};
|
||||
mnodeObj.id = pDnode->id;
|
||||
mnodeObj.createdTime = taosGetTimestampMs();
|
||||
|
@ -370,7 +370,7 @@ CREATE_MNODE_OVER:
|
|||
|
||||
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
|
||||
pCreate->dnodeId = htonl(pCreate->dnodeId);
|
||||
|
||||
|
@ -423,7 +423,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
void *pIter = NULL;
|
||||
int32_t numOfReplicas = 0;
|
||||
|
||||
SAlterMnodeInMsg alterMsg = {0};
|
||||
SDAlterMnodeMsg alterMsg = {0};
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
|
@ -449,18 +449,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
if (pMObj->id != pObj->id) {
|
||||
STransAction action = {0};
|
||||
|
||||
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
|
||||
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg));
|
||||
memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg));
|
||||
|
||||
pMsg->dnodeId = htonl(pMObj->id);
|
||||
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SAlterMnodeInMsg);
|
||||
action.contLen = sizeof(SDAlterMnodeMsg);
|
||||
action.msgType = TDMT_DND_ALTER_MNODE;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
|
@ -478,7 +478,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
|
||||
SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg));
|
||||
SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -487,7 +487,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDropMnodeInMsg);
|
||||
action.contLen = sizeof(SDDropMnodeMsg);
|
||||
action.msgType = TDMT_DND_DROP_MNODE;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -537,7 +537,7 @@ DROP_MNODE_OVER:
|
|||
|
||||
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||
SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||
pDrop->dnodeId = htonl(pDrop->dnodeId);
|
||||
|
||||
mDebug("mnode:%d, start to drop", pDrop->dnodeId);
|
||||
|
|
|
@ -201,7 +201,6 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
|
|||
}
|
||||
|
||||
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
|
||||
#if 1
|
||||
SVCreateTbReq req;
|
||||
void * buf;
|
||||
int bsize;
|
||||
|
@ -235,43 +234,12 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
|
|||
|
||||
*pContLen = sizeof(SMsgHead) + bsize;
|
||||
return buf;
|
||||
|
||||
#else
|
||||
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
|
||||
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
|
||||
|
||||
SCreateStbInternalMsg *pCreate = calloc(1, contLen);
|
||||
if (pCreate == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCreate->head.contLen = htonl(contLen);
|
||||
pCreate->head.vgId = htonl(pVgroup->vgId);
|
||||
memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
pCreate->suid = htobe64(pStb->uid);
|
||||
pCreate->sverson = htonl(pStb->version);
|
||||
pCreate->ttl = 0;
|
||||
pCreate->keep = 0;
|
||||
pCreate->numOfTags = htonl(pStb->numOfTags);
|
||||
pCreate->numOfColumns = htonl(pStb->numOfColumns);
|
||||
|
||||
memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema));
|
||||
for (int32_t t = 0; t < totalCols; ++t) {
|
||||
SSchema *pSchema = &pCreate->pSchema[t];
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
}
|
||||
|
||||
*pContLen = contLen;
|
||||
return pCreate;
|
||||
#endif
|
||||
}
|
||||
|
||||
static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
||||
int32_t contLen = sizeof(SDropStbInternalMsg);
|
||||
static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
|
||||
int32_t contLen = sizeof(SVDropStbReq);
|
||||
|
||||
SDropStbInternalMsg *pDrop = calloc(1, contLen);
|
||||
SVDropStbReq *pDrop = calloc(1, contLen);
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -402,7 +370,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
|
||||
SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
|
@ -413,7 +381,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDropStbInternalMsg);
|
||||
action.contLen = sizeof(SVDropStbReq);
|
||||
action.msgType = TDMT_VND_DROP_STB;
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
|
|
@ -162,10 +162,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
|
|||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
|
||||
static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) {
|
||||
int32_t contLen = sizeof(SDropTopicInternalMsg);
|
||||
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) {
|
||||
int32_t contLen = sizeof(SDDropTopicMsg);
|
||||
|
||||
SDropTopicInternalMsg *pDrop = calloc(1, contLen);
|
||||
SDDropTopicMsg *pDrop = calloc(1, contLen);
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
|
|
@ -120,6 +120,9 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
|
|||
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
|
||||
if (pVgroup->replica == 1) {
|
||||
pVgid->role = TAOS_SYNC_STATE_LEADER;
|
||||
}
|
||||
}
|
||||
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
|
||||
|
||||
|
|
Loading…
Reference in New Issue