drop stb
This commit is contained in:
parent
a4bd459404
commit
ba13d6daff
|
@ -235,20 +235,26 @@ TEST_F(DndTestVnode, 04_ALTER_Stb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestVnode, 05_DROP_Stb) {
|
TEST_F(DndTestVnode, 05_DROP_Stb) {
|
||||||
#if 0
|
|
||||||
{
|
{
|
||||||
|
int32_t contLen = sizeof(SVDropTbReq);
|
||||||
|
SVDropTbReq* pReq = (SVDropTbReq*)rpcMallocCont(contLen);
|
||||||
|
strcpy(pReq->name, "stb1");
|
||||||
|
pReq->suid = 0;
|
||||||
|
|
||||||
|
SMsgHead* pMsgHead = (SMsgHead*)&pReq->head;
|
||||||
|
pMsgHead->contLen = htonl(contLen);
|
||||||
|
pMsgHead->vgId = htonl(2);
|
||||||
|
|
||||||
for (int i = 0; i < 3; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_VND_DROP_STB, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
test.Restart();
|
} // else {
|
||||||
} else {
|
// ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_TDB_INVALID_TABLE_ID);
|
//}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestVnode, 06_DROP_Vnode) {
|
TEST_F(DndTestVnode, 06_DROP_Vnode) {
|
||||||
|
|
|
@ -181,23 +181,26 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||||
|
|
||||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
||||||
atomic_exchange_32(&pOld->updateTime, pNew->updateTime);
|
|
||||||
atomic_exchange_32(&pOld->version, pNew->version);
|
|
||||||
|
|
||||||
taosWLockLatch(&pOld->lock);
|
taosWLockLatch(&pOld->lock);
|
||||||
pOld->numOfColumns = pNew->numOfColumns;
|
|
||||||
pOld->numOfTags = pNew->numOfTags;
|
|
||||||
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
|
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
|
||||||
int32_t totalSize = totalCols * sizeof(SSchema);
|
int32_t totalSize = totalCols * sizeof(SSchema);
|
||||||
|
|
||||||
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
|
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
|
||||||
void *pSchema = malloc(totalSize);
|
void *pSchema = malloc(totalSize);
|
||||||
if (pSchema != NULL) {
|
if (pSchema != NULL) {
|
||||||
free(pOld->pSchema);
|
free(pOld->pSchema);
|
||||||
pOld->pSchema = pSchema;
|
pOld->pSchema = pSchema;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||||
|
taosWUnLockLatch(&pOld->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOld->updateTime = pNew->updateTime;
|
||||||
|
pOld->version = pNew->version;
|
||||||
|
pOld->numOfColumns = pNew->numOfColumns;
|
||||||
|
pOld->numOfTags = pNew->numOfTags;
|
||||||
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
|
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
|
||||||
taosWUnLockLatch(&pOld->lock);
|
taosWUnLockLatch(&pOld->lock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -228,10 +231,10 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
|
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
|
||||||
SVCreateTbReq req;
|
SVCreateTbReq req = {0};
|
||||||
void *buf;
|
void *buf = NULL;
|
||||||
int32_t bsize;
|
int32_t bsize = 0;
|
||||||
SMsgHead *pMsgHead;
|
SMsgHead *pMsgHead = NULL;
|
||||||
|
|
||||||
req.ver = 0;
|
req.ver = 0;
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
@ -358,7 +361,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int32_t contLen;
|
int32_t contLen;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
@ -498,9 +501,9 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// topic should have different name with stb
|
// topic should have different name with stb
|
||||||
SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name);
|
SStbObj *pTopicStb = mndAcquireStb(pMnode, pCreate->name);
|
||||||
if (pTopic != NULL) {
|
if (pTopicStb != NULL) {
|
||||||
sdbRelease(pMnode->pSdb, pTopic);
|
mndReleaseStb(pMnode, pTopicStb);
|
||||||
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
|
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -517,7 +520,6 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = code;
|
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -603,15 +605,6 @@ static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
|
||||||
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
|
|
||||||
if (pUndoRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
|
||||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
|
||||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
@ -621,22 +614,53 @@ static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
void *pIter = NULL;
|
||||||
|
int32_t contLen;
|
||||||
|
|
||||||
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (pVgroup->dbUid != pDb->uid) continue;
|
||||||
|
|
||||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
|
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_VND_DROP_STB;
|
||||||
|
action.acceptableCode = TSDB_CODE_VND_TB_NOT_EXIST;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
free(pReq);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL)goto DROP_STB_OVER;
|
if (pTrans == NULL) goto DROP_STB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||||
|
|
||||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
|
||||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto DROP_STB_OVER;
|
||||||
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -664,7 +688,16 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndDropStb(pMnode, pReq, pStb);
|
SDbObj *pDb = mndAcquireDbByStb(pMnode, pDrop->name);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mndDropStb(pMnode, pReq, pDb, pStb);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -856,7 +889,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
|
||||||
|
|
||||||
if (pStb->dbUid != pDb->uid) {
|
if (pStb->dbUid != pDb->uid) {
|
||||||
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) {
|
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) {
|
||||||
mError("Inconsistent table data, name:%s, db:%s, dbUid:%"PRIu64, pStb->name, pDb->name, pDb->uid);
|
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pStb);
|
sdbRelease(pSdb, pStb);
|
||||||
|
|
|
@ -103,7 +103,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case TDMT_VND_ALTER_STB:
|
||||||
|
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||||
|
break;
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
|
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||||
|
break;
|
||||||
case TDMT_VND_DROP_TABLE:
|
case TDMT_VND_DROP_TABLE:
|
||||||
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||||
// // TODO: handle error
|
// // TODO: handle error
|
||||||
|
|
Loading…
Reference in New Issue