handle drop stb req

This commit is contained in:
Shengliang Guan 2022-01-28 10:54:50 +08:00
parent ba13d6daff
commit 2d439d29e2
4 changed files with 88 additions and 60 deletions

View File

@ -140,6 +140,11 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
@ -1139,9 +1144,6 @@ typedef struct SVCreateTbReq {
char* name; char* name;
uint32_t ttl; uint32_t ttl;
uint32_t keep; uint32_t keep;
#define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
uint8_t type; uint8_t type;
union { union {
struct { struct {
@ -1187,15 +1189,21 @@ typedef struct {
} SVAlterTbRsp; } SVAlterTbRsp;
typedef struct { typedef struct {
SMsgHead head; uint64_t ver;
char name[TSDB_TABLE_FNAME_LEN]; char* name;
int64_t suid; uint8_t type;
tb_uid_t suid;
} SVDropTbReq; } SVDropTbReq;
typedef struct { typedef struct {
SMsgHead head; uint64_t ver;
} SVDropTbRsp; } SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp);
void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t uid; int64_t uid;

View File

@ -310,3 +310,18 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
return buf; return buf;
} }
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU8(buf, pReq->type);
return tlen;
}
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeString(buf, &pReq->name);
buf = taosDecodeFixedU8(buf, &pReq->type);
return buf;
}

View File

@ -199,19 +199,16 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
req.stbCfg.nTagCols = 3; req.stbCfg.nTagCols = 3;
req.stbCfg.pTagSchema = &schemas[2]; req.stbCfg.pTagSchema = &schemas[2];
int32_t bsize = tSerializeSVCreateTbReq(NULL, &req); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
void* buf = rpcMallocCont(sizeof(SMsgHead) + bsize); SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
SMsgHead* pMsgHead = (SMsgHead*)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize); pHead->contLen = htonl(contLen);
pMsgHead->vgId = htonl(2); pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req); tSerializeSVCreateTbReq(&pBuf, &req);
int32_t contLen = sizeof(SMsgHead) + bsize; SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, (void*)pHead, contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_CREATE_STB, buf, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
if (i == 0) { if (i == 0) {
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
@ -236,23 +233,25 @@ TEST_F(DndTestVnode, 04_ALTER_Stb) {
TEST_F(DndTestVnode, 05_DROP_Stb) { TEST_F(DndTestVnode, 05_DROP_Stb) {
{ {
int32_t contLen = sizeof(SVDropTbReq);
SVDropTbReq* pReq = (SVDropTbReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "stb1");
pReq->suid = 0;
SMsgHead* pMsgHead = (SMsgHead*)&pReq->head;
pMsgHead->contLen = htonl(contLen);
pMsgHead->vgId = htonl(2);
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen); SVDropTbReq req = {0};
req.ver = 0;
req.name = (char*)"stb1";
req.suid = 9599;
req.type = TD_SUPER_TABLE;
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(2);
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, (void*)pHead, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
if (i == 0) {
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} // else {
// ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
//}
} }
} }
} }

View File

@ -231,15 +231,11 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
} }
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SVCreateTbReq req = {0};
void *buf = NULL;
int32_t bsize = 0;
SMsgHead *pMsgHead = NULL;
req.ver = 0;
SName name = {0}; SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateTbReq req = {0};
req.ver = 0;
req.name = (char *)tNameGetTableName(&name); req.name = (char *)tNameGetTableName(&name);
req.ttl = 0; req.ttl = 0;
req.keep = 0; req.keep = 0;
@ -250,40 +246,48 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.stbCfg.nTagCols = pStb->numOfTags; req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns; req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
bsize = tSerializeSVCreateTbReq(NULL, &req); int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
buf = malloc(sizeof(SMsgHead) + bsize); SMsgHead *pHead = malloc(contLen);
if (buf == NULL) { if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pMsgHead = (SMsgHead *)buf; pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
pMsgHead->contLen = htonl(sizeof(SMsgHead) + bsize); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
pMsgHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbReq(&pBuf, &req); tSerializeSVCreateTbReq(&pBuf, &req);
*pContLen = sizeof(SMsgHead) + bsize; *pContLen = contLen;
return buf; return pHead;
} }
static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
int32_t contLen = sizeof(SVDropTbReq); SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVDropTbReq *pDrop = calloc(1, contLen); SVDropTbReq req = {0};
if (pDrop == NULL) { req.ver = 0;
req.name = (char *)tNameGetTableName(&name);
req.type = TD_SUPER_TABLE;
req.suid = pStb->uid;
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pDrop->head.contLen = htonl(contLen); pHead->contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pDrop->suid = htobe64(pStb->uid);
return pDrop; void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
*pContLen = contLen;
return pHead;
} }
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
@ -403,7 +407,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb); int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
@ -414,7 +419,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SVDropTbReq); action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB; action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) { if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pReq); free(pReq);
@ -625,7 +630,8 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb); int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);