drop tb in transaction

This commit is contained in:
wangjiaming0909 2024-03-18 14:09:17 +08:00
parent 3d77e8432d
commit a8e00f7410
12 changed files with 486 additions and 258 deletions

View File

@ -4272,39 +4272,26 @@ typedef struct SStreamProgressRsp {
int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp);
int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp);
typedef struct SDropCtbWithTsmaSingleTbReq {
SVDropTbReq req;
bool isTsmaResTb;
int64_t tsmaUid;
int64_t stbUid; // stable uid
} SMDropCtbWithTsmaSingleTbReq;
typedef struct SDropCtbWithTsmaSingleVgReq {
SVgroupInfo vgInfo;
SArray* pTbs;
} SMDropCtbWithTsmaSingleVgReq;
SArray* pTbs; // SVDropTbReq
} SMDropTbReqsOnSingleVg;
typedef struct SDropCtbWithTsmaReq {
SArray* pVgReqs;
} SMDropTbWithTsmaReq;
int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder* pEncoder, const SMDropTbReqsOnSingleVg* pReq);
int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq);
int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq);
int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq);
typedef struct SDropTbsReq {
SArray* pVgReqs; // SMDropTbReqsOnSingleVg
} SMDropTbsReq;
typedef struct SVTtlExpiredTb {
tb_uid_t uid;
char name[TSDB_TABLE_NAME_LEN];
tb_uid_t suid;
} SVTtlExpiredTb;
int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq);
int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq);
typedef struct SVFetchTtlExpiredTbsRsp {
SArray* pExpiredTbs;
SArray* pExpiredTbs; // SVDropTbReq
int32_t vgId;
} SVFetchTtlExpiredTbsRsp;
int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb);
int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb);
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp);
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp);

View File

@ -332,7 +332,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA)
(_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
(SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \

View File

@ -828,6 +828,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSMA_INVALID_INTERVAL TAOS_DEF_ERROR_CODE(0, 0x3107)
#define TSDB_CODE_TSMA_INVALID_FUNC_PARAM TAOS_DEF_ERROR_CODE(0, 0x3108)
#define TSDB_CODE_TSMA_UNSUPPORTED_FUNC TAOS_DEF_ERROR_CODE(0, 0x3109)
#define TSDB_CODE_TSMA_MUST_BE_DROPPED TAOS_DEF_ERROR_CODE(0, 0x3110)
//rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)

View File

@ -8322,8 +8322,8 @@ static int32_t tDecodeSVDropTbRsp(SDecoder *pCoder, SVDropTbRsp *pReq) {
}
int32_t tEncodeSVDropTbBatchReq(SEncoder *pCoder, const SVDropTbBatchReq *pReq) {
int32_t nReqs = taosArrayGetSize(pReq->pArray);
SVDropTbReq *pDropTbReq;
int32_t nReqs = taosArrayGetSize(pReq->pArray);
SVDropTbReq *pDropTbReq;
if (tStartEncode(pCoder) < 0) return -1;
@ -10303,20 +10303,77 @@ int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgres
return 0;
}
int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq){ return 0;}
int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq) { return 0;}
int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb) {
if (tEncodeI64(pEncoder, pTb->uid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTb->name) < 0) return -1;
if (tEncodeI64(pEncoder, pTb->suid) < 0) return -1;
int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder *pEncoder, const SMDropTbReqsOnSingleVg *pReq) {
const SVgroupInfo* pVgInfo = &pReq->vgInfo;
if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
int32_t size = pReq->pTbs ? pReq->pTbs->size: 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SVDropTbReq* pInfo = taosArrayGet(pReq->pTbs, i);
if (tEncodeSVDropTbReq(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb) {
if (tDecodeI64(pDecoder, &pTb->uid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTb->name) < 0) return -1;
if (tDecodeI64(pDecoder, &pTb->suid) < 0) return -1;
int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq) {
if (tDecodeI32(pDecoder, &pReq->vgInfo.vgId) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->vgInfo.epSet) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgInfo.numOfTable) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
pReq->pTbs = taosArrayInit(size, sizeof(SVDropTbReq));
if (!pReq->pTbs) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SVDropTbReq pTbReq = {0};
for (int32_t i = 0; i < size; ++i) {
if (tDecodeSVDropTbReq(pDecoder, &pTbReq) < 0) return -1;
taosArrayPush(pReq->pTbs, &pTbReq);
}
return 0;
}
int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq){
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
tStartEncode(&encoder);
int32_t size = pReq->pVgReqs ? pReq->pVgReqs->size : 0;
if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
SMDropTbReqsOnSingleVg* pVgReq = taosArrayGet(pReq->pVgReqs, i);
if (tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
tStartDecode(&decoder);
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
pReq->pVgReqs = taosArrayInit(size, sizeof(SMDropTbReqsOnSingleVg));
if (!pReq->pVgReqs) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < size; ++i) {
SMDropTbReqsOnSingleVg vgReq = {0};
tDecodeSMDropTbReqOnSingleVg(&decoder, &vgReq);
taosArrayPush(pReq->pVgReqs, &vgReq);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
@ -10325,21 +10382,21 @@ int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredT
int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0;
if (tEncodeI32(pCoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
if (tEncodeTtlExpiredTb(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1;
if (tEncodeSVDropTbReq(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1;
}
return 0;
}
int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp) {
if (tDecodeI32(pclose, &pRsp->vgId) < 0) return -1;
if (tDecodeI32(pCoder, &pRsp->vgId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pCoder, &size) < 0) return -1;
if (size > 0) {
pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVTtlExpiredTb));
pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVDropTbReq));
if (!pRsp->pExpiredTbs) return TSDB_CODE_OUT_OF_MEMORY;
SVTtlExpiredTb tb;
SVDropTbReq tb = {0};
for (int32_t i = 0; i < size; ++i) {
if (tDecodeTtlExpiredTb(pCoder, &tb) < 0) return -1;
if (tDecodeSVDropTbReq(pCoder, &tb) < 0) return -1;
taosArrayPush(pRsp->pExpiredTbs, &tb);
}
}

View File

@ -171,6 +171,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -29,6 +29,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList);
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src);

View File

@ -1521,7 +1521,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) {
return numOfTables;
}
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
int32_t vindex = 0;
SSdb *pSdb = pMnode->pSdb;

View File

@ -95,6 +95,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
@ -3708,25 +3709,166 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
return code;
}
static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, int32_t vgId, SArray* pTbs) {
return 0;
typedef struct SVDropTbVgReqs {
SVDropTbBatchReq req;
SVgroupInfo info;
} SVDropTbVgReqs;
typedef struct SMDropTbDbInfo {
SArray *dbVgInfos;
int32_t hashPrefix;
int32_t hashSuffix;
int32_t hashMethod;
} SMDropTbDbInfo;
typedef struct SMDropTbTsmaInfo {
char tsmaResTbDbFName[TSDB_DB_FNAME_LEN];
char tsmaResTbNamePrefix[TSDB_TABLE_NAME_LEN];
int32_t suid;
SMDropTbDbInfo dbInfo; // reference to DbInfo in pDbMap
} SMDropTbTsmaInfo;
typedef struct SMDropTbTsmaInfos {
SArray* pTsmaInfos; // SMDropTbTsmaInfo
} SMDropTbTsmaInfos;
typedef struct SMndDropTbsWithTsmaCtx {
SHashObj* pTsmaMap; // <suid, SMDropTbTsmaInfos>
SHashObj* pDbMap; // <dbuid, SMDropTbDbInfo>
SHashObj* pVgMap; // <vgId, SVDropTbVgReqs>
SArray* pResTbNames; // SArray<char*>
} SMndDropTbsWithTsmaCtx;
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId);
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
if (!p) return;
if (p->pDbMap) {
void* pIter = taosHashIterate(p->pDbMap, NULL);
while (pIter) {
SMDropTbDbInfo* pInfo = pIter;
taosArrayDestroy(pInfo->dbVgInfos);
pIter = taosHashIterate(p->pDbMap, pIter);
}
taosHashCleanup(p->pDbMap);
}
if (p->pResTbNames) {
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
}
if (p->pTsmaMap) {
void* pIter = taosHashIterate(p->pTsmaMap, NULL);
while (pIter) {
SMDropTbTsmaInfos* pInfos = pIter;
taosArrayDestroy(pInfos->pTsmaInfos);
pIter = taosHashIterate(p->pTsmaMap, pIter);
}
taosHashCleanup(p->pTsmaMap);
}
if (p->pVgMap) {
void* pIter = taosHashIterate(p->pVgMap, NULL);
while (pIter) {
SVDropTbVgReqs *pReqs = pIter;
taosArrayDestroy(pReqs->req.pArray);
pIter = taosHashIterate(p->pVgMap, pIter);
}
taosHashCleanup(p->pVgMap);
}
taosMemoryFree(p);
}
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp) {
int32_t code = -1;
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx** ppCtx) {
int32_t code = 0;
SMndDropTbsWithTsmaCtx* pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY;
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (!pCtx->pTsmaMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (!pCtx->pDbMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pCtx->pVgMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
*ppCtx = pCtx;
_end:
if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
return code;
}
static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, const SVDropTbBatchReq* pReq, int32_t *len) {
int32_t contLen = 0;
int32_t ret = 0;
SMsgHead *pHead = NULL;
SEncoder encoder = {0};
tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret);
if (ret < 0) return NULL;
contLen += sizeof(SMsgHead);
pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgInfo->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
tEncodeSVDropTbBatchReq(&encoder, pReq);
tEncoderClear(&encoder);
*len = contLen;
return pHead;
}
static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SVDropTbVgReqs* pVgReqs, void* pCont, int32_t contLen) {
STransAction action = {0};
action.epSet = pVgReqs->info.epSet;
action.pCont = pCont;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_TABLE;
action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return mndTransAppendRedoAction(pTrans, &action);
}
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) {
SMnode *pMnode = pRsp->info.node;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
if (pTrans == NULL) goto _OVER;
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
//if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
void* pIter = taosHashIterate(pCtx->pVgMap, NULL);
while (pIter) {
const SVDropTbVgReqs* pVgReqs = pIter;
int32_t len = 0;
void* p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) {
taosHashCancelIterate(pCtx->pVgMap, pIter);
goto _OVER;
}
pIter = taosHashIterate(pCtx->pVgMap, pIter);
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
return terrno;
}
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
@ -3734,164 +3876,195 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
SMnode *pMnode = pReq->info.node;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SMDropTbWithTsmaReq dropReq = {0};
SMDropTbsReq dropReq = {0};
bool locked = false;
if (tDeserializeDropCtbWithTsmaReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
SHashObj* pTsmaHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SHashObj* pSourceTbHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SMndDropTbsWithTsmaCtx* pCtx = NULL;
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
if (terrno) goto _OVER;
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
const SMDropCtbWithTsmaSingleVgReq* pVgReq = taosArrayGet(dropReq.pVgReqs, i);
for (int32_t j = 0; j < pVgReq->pTbs->size; ++j) {
const SMDropCtbWithTsmaSingleTbReq* pTbReq = taosArrayGet(pVgReq->pTbs, j);
if (pTbReq->isTsmaResTb) {
taosHashPut(pTsmaHashSet, &pTbReq->tsmaUid, sizeof(pTbReq->tsmaUid), NULL, 0);
taosHashPut(pSourceTbHashSet, &pTbReq->req.suid, sizeof(pTbReq->req.suid), NULL, 0);
} else {
taosHashPut(pSourceTbHashSet, &pTbReq->stbUid, sizeof(pTbReq->stbUid), NULL, 0);
}
}
SMDropTbReqsOnSingleVg* pReq = taosArrayGet(dropReq.pVgReqs, i);
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (terrno) goto _OVER;
}
sdbReadLock(pMnode->pSdb, SDB_SMA);
locked = true;
void *pIter = NULL;
SSmaObj *pSma = NULL;
while(1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
if (taosHashGet(pSourceTbHashSet, &pSma->stbUid, sizeof(pSma->stbUid))) {
if (!taosHashGet(pTsmaHashSet, &pSma->uid, sizeof(pSma->uid))) {
// TODO should retry
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
}
sdbRelease(pMnode->pSdb, pSma);
}
// start transaction
code = 0;
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0)
code = 0;
_OVER:
if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA);
taosHashCleanup(pTsmaHashSet);
taosHashCleanup(pSourceTbHashSet);
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
return code;
}
typedef struct SDropTbVgReqs {
SVDropTbBatchReq req;
SVgroupInfo info;
} SDropTbVgReqs;
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
bool ignoreNotExists) {
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
static int32_t mndDropTbAdd(SMnode* pMnode, SHashObj* pVgHashMap, const SVgObj* pVgObj, SVDropTbReq* pReq, char* name, tb_uid_t suid) {
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = true};
SVgroupInfo info = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .vgId = pVgObj->vgId};
info.epSet = mndGetVgroupEpset(pMnode, pVgObj);
SDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId));
SDropTbVgReqs reqs = {0};
SVDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
SVDropTbVgReqs reqs = {0};
if (pReqs == NULL) {
reqs.info = info;
reqs.info = *pVgInfo;
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
taosArrayPush(reqs.req.pArray, pReq);
taosHashPut(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId), &reqs, sizeof(reqs));
taosArrayPush(reqs.req.pArray, &req);
taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs));
} else {
taosArrayPush(pReqs->req.pArray, pReq);
taosArrayPush(pReqs->req.pArray, &req);
}
return 0;
}
static int32_t mndGetDbVgInfoForTsma(SMnode* pMnode, const char* dbname, SMDropTbTsmaInfo* pInfo) {
int32_t code = 0;
SDbObj* pDb = mndAcquireDb(pMnode, dbname);
if (!pDb) {
code = TSDB_CODE_MND_DB_NOT_EXIST;
goto _end;
}
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if ( !pInfo->dbInfo.dbVgInfos) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos);
pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix;
pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix;
pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod;
_end:
if (pDb) mndReleaseDb(pMnode, pDb);
if (code && pInfo->dbInfo.dbVgInfos) {
taosArrayDestroy(pInfo->dbInfo.dbVgInfos);
pInfo->dbInfo.dbVgInfos = NULL;
}
return code;
}
int32_t vgHashValCmp(const void* lp, const void* rp) {
uint32_t* key = (uint32_t*)lp;
SVgroupInfo* pVg = (SVgroupInfo*)rp;
if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
}
return 0;
}
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId) {
int32_t code = 0;
SVgObj* pVgObj = mndAcquireVgroup(pMnode, vgId);
if (!pVgObj) {
code = 0;
goto _end;
}
SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .numOfTable = pVgObj->numOfTables, .vgId = pVgObj->vgId};
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
// get all stb uids
for (int32_t i = 0; i < pTbs->size; ++i) {
const SVDropTbReq* pTb = taosArrayGet(pTbs, i);
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
} else {
SMDropTbTsmaInfos infos = {0};
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
if (!infos.pTsmaInfos) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos));
}
}
void *pIter = NULL;
SSmaObj *pSma = NULL;
char buf[TSDB_TABLE_FNAME_LEN] = {0};
// get used tsmas and it's dbs
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
SMDropTbTsmaInfos* pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
if (pInfos) {
SMDropTbTsmaInfo info = {0};
int32_t len = sprintf(buf, "%s", pSma->name);
len = taosCreateMD5Hash(buf, len);
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
sprintf(info.tsmaResTbNamePrefix, "%s", buf);
SMDropTbDbInfo* pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
info.suid = pSma->dstTbUid;
if (!pDbInfo) {
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pSma);
goto _end;
}
taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo));
} else {
info.dbInfo = *pDbInfo;
}
taosArrayPush(pInfos->pTsmaInfos, &info);
}
sdbRelease(pMnode->pSdb, pSma);
}
// generate vg req map
for (int32_t i = 0; i < pTbs->size; ++i) {
SVDropTbReq* pTb = taosArrayGet(pTbs, i);
mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists);
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
SArray *pVgInfos = NULL;
char buf[TSDB_TABLE_FNAME_LEN];
for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) {
SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j);
int32_t len = sprintf(buf, "%s.%s_%s", pInfo->tsmaResTbDbFName, pInfo->tsmaResTbNamePrefix, pTb->name);
uint32_t hashVal =
taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix);
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
void* p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
taosArrayPush(pCtx->pResTbNames, &p);
mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true);
}
}
_end:
return code;
}
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
int32_t code = -1;
SDecoder decoder = {0};
SMnode *pMnode = pRsp->info.node;
bool locked = false;
SHashObj* pTsmaMap = NULL;
SHashObj* pVgroupMap = NULL;
SHashObj* pDbMap = NULL;
SVFetchTtlExpiredTbsRsp rsp;
SVFetchTtlExpiredTbsRsp rsp = {0};
SMndDropTbsWithTsmaCtx *pCtx = NULL;
if (pRsp->code != TSDB_CODE_SUCCESS) goto _end;
if (pRsp->contLen == 0) {
code = 0;
goto _end;
}
tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
terrno = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp);
if (terrno) goto _end;
pTsmaMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (!pTsmaMap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
if (terrno) goto _end;
// get all stb uids
for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) {
const SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i);
if (taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
} else {
SArray* pTsmas = taosArrayInit(2, TSDB_TABLE_NAME_LEN);
if (!pTsmas) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
taosHashPut(pTsmaMap, &pTb->suid, sizeof(pTb->suid), &pTsmas, sizeof(pTsmas));
}
}
sdbReadLock(pMnode->pSdb, SDB_SMA);
locked = true;
void *pIter = NULL;
SSmaObj *pSma = NULL;
char buf[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1] = {0};
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
if (!pIter) break;
SArray* pTsmas = taosHashGet(pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
if (pTsmas) {
int32_t len = sprintf(buf, "%s.%s", pSma->db, pSma->name);
len = taosCreateMD5Hash(buf, len);
taosArrayPush(pTsmas, buf);
}
sdbRelease(pMnode->pSdb, pSma);
}
pVgroupMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pVgroupMap) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
SVgObj* pVgObj = mndAcquireVgroup(pMnode, rsp.vgId);
if (!pVgObj) {
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (terrno) goto _end;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0)
code = 0;
goto _end;
}
SVDropTbReq req = {.igNotExists = true};
for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) {
SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i);
req.name = pTb->name;
req.suid = pTb->suid;
mndDropTbAdd(pMnode, pVgroupMap, pVgObj, &req, pTb->name, pTb->suid);
SArray* pTsmas = taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid));
for (int32_t j = 0; j > pTsmas->size; ++j) {
char* name = taosArrayGet(pTsmas, j);
sprintf(name + strlen(name), "_%s", pTb->name);
}
}
mndReleaseVgroup(pMnode, pVgObj);
if (terrno == 0) code = 0;
_end:
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
tDecoderClear(&decoder);
tFreeFetchTtlExpiredTbsRsp(&rsp);
if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA);
//pVgroupHashmap
//pTsmaMap
return code;
}

View File

@ -937,6 +937,7 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
SVDropTtlTableReq ttlReq = {0};
SVFetchTtlExpiredTbsRsp rsp = {0};
SEncoder encoder = {0};
SArray* pNames = NULL;
pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
@ -951,19 +952,26 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
tb_uid_t suid;
char ctbName[TSDB_TABLE_NAME_LEN];
SVTtlExpiredTb expiredTb = {0};
SVDropTbReq expiredTb = {.igNotExists = true};
metaReaderDoInit(&mr, pVnode->pMeta, 0);
rsp.vgId = TD_VID(pVnode);
rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVTtlExpiredTb));
rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVDropTbReq));
if (!rsp.pExpiredTbs) goto _end;
pNames = taosArrayInit(ttlReq.nUids, TSDB_TABLE_NAME_LEN);
if (!pNames) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
char buf[TSDB_TABLE_NAME_LEN];
for (int32_t i = 0; i < ttlReq.nUids; ++i) {
tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i);
expiredTb.uid = *uid;
expiredTb.suid = *uid;
terrno = metaReaderGetTableEntryByUid(&mr, *uid);
if (terrno < 0) goto _end;
strncpy(expiredTb.name, mr.me.name, TSDB_TABLE_NAME_LEN);
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
void* p = taosArrayPush(pNames, buf);
expiredTb.name = p;
if (mr.me.type == TSDB_CHILD_TABLE) {
expiredTb.suid = mr.me.ctbEntry.suid;
}
@ -986,6 +994,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void*
_end:
metaReaderClear(&mr);
tFreeFetchTtlExpiredTbsRsp(&rsp);
taosArrayDestroy(ttlReq.pTbUids);
if (pNames) taosArrayDestroy(pNames);
pRsp->code = terrno;
return code;
}

View File

@ -7947,16 +7947,9 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableStmt* pStmt) {
SNode* pNode;
// note that there could have normal tables
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
if (pClause->pTsmas) {
for (int32_t i = 0; i < pClause->pTsmas->size; ++i) {
}
// generate tsma res ctb names and get it's vgInfo
}
}
// assemble all tbs into one req, then send to mnode
return TSDB_CODE_SUCCESS;
}
@ -12285,18 +12278,16 @@ static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SVgroupInfo* pVgInf
}
}
static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, bool* pIsSuperTable,
SHashObj* pVgroupHashmap) {
SName name;
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, const SName* name,
int8_t* tableType, SHashObj* pVgroupHashmap) {
STableMeta* pTableMeta = NULL;
int32_t code = getTargetMeta(pCxt, &name, &pTableMeta, false);
int32_t code = getTargetMeta(pCxt, name, &pTableMeta, false);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(&name, pCxt->pTargetTables);
code = collectUseTable(name, pCxt->pTargetTables);
}
*tableType = pTableMeta->tableType;
if (TSDB_CODE_SUCCESS == code && TSDB_SUPER_TABLE == pTableMeta->tableType) {
*pIsSuperTable = true;
goto over;
}
@ -12305,14 +12296,14 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
goto over;
}
*pIsSuperTable = false;
SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
SVDropTbReq req = {.name = pClause->tableName, .suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists};
SVDropTbReq req = {.suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists};
req.name = pClause->tableName;
addDropTbReqIntoVgroup(pVgroupHashmap, &info, &req);
}
@ -12378,50 +12369,9 @@ SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) {
return pBufArray;
}
static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap, SDropTableStmt* pStmt) {
char tsmaResTbName[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1];
SName tbName = {0};
SNode* pNode;
int32_t code = 0;
STableMeta* pTableMeta = NULL;
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
if (pClause->pTsmas) {
for (int32_t i = 0; i < pClause->pTsmas->size; ++i) {
const STableTSMAInfo* pTsma = taosArrayGetP(pClause->pTsmas, i);
int32_t len = sprintf(tsmaResTbName, "%s.%s", pTsma->dbFName, pTsma->name);
len = taosCreateMD5Hash(tsmaResTbName, len);
sprintf(tsmaResTbName + len, "_%s", pClause->tableName);
toName(pCxt->pParseCxt->acctId, pClause->dbName, tsmaResTbName, &tbName);
/*code = getTargetMeta(pCxt, &tbName, &pTableMeta, false);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = TSDB_CODE_SUCCESS;
continue;
}
if (code) break; */
collectUseTable(&tbName, pCxt->pTargetTables);
SVgroupInfo info = {0};
bool exists = false;
if (TSDB_CODE_SUCCESS == code) {
//code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info);
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tbName, &info, &exists);
}
if (TSDB_CODE_SUCCESS == code && exists) {
SVDropTbReq req = {.name = tsmaResTbName, .suid = pTsma->destTbUid, .igNotExists = true};
addDropTbReqIntoVgroup(pVgMap, &info, &req);
}
}
}
if (code) break;
}
return code;
}
static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot;
bool isSuperTable = false;
int8_t tableType;
SNode* pNode;
SArray* pTsmas = NULL;
@ -12432,33 +12382,61 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
FOREACH(pNode, pStmt->pTables) {
int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap);
SDropTableClause* pClause = (SDropTableClause*)pNode;
SName name;
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
return code;
}
if (isSuperTable && LIST_LENGTH(pStmt->pTables) > 1) {
if (tableType == TSDB_SUPER_TABLE && LIST_LENGTH(pStmt->pTables) > 1) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_STABLE);
}
code = getTableTsmasFromCache(pCxt->pMetaCache, &name, &pTsmas);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
return code;
}
if (!pStmt->withTsma) {
pStmt->withTsma = pTsmas && pTsmas->size > 0;
}
pClause->pTsmas = pTsmas;
if (tableType == TSDB_NORMAL_TABLE && pStmt->withTsma) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_TSMA_MUST_BE_DROPPED;
}
}
if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) {
if (tableType == TSDB_SUPER_TABLE || 0 == taosHashGetSize(pVgroupHashmap)) {
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
SName name;
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
getTableTsmasFromCache(pCxt->pMetaCache, &name, &pTsmas);
if (pTsmas && pTsmas->size > 0) {
pClause->pTsmas= pTsmas;
pStmt->withTsma = true;
}
}
int32_t code = 0;
if (pStmt->withTsma) {
dropTableAddTsmaResTb(pCxt, pVgroupHashmap, pStmt);
if (code == TSDB_CODE_SUCCESS) {
SMDropTbsReq req = {0};
req.pVgReqs = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(SMDropTbReqsOnSingleVg));
SVgroupDropTableBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) {
break;
}
SMDropTbReqsOnSingleVg reqOnVg = {0};
reqOnVg.vgInfo = pTbBatch->info;
reqOnVg.pTbs = pTbBatch->req.pArray;
taosArrayPush(req.pVgReqs, &reqOnVg);
} while (true);
code = buildCmdMsg(pCxt, TDMT_MND_DROP_TB_WITH_TSMA, (FSerializeFunc)tSerializeSMDropTbsReq, &req);
taosArrayDestroy(req.pVgReqs);
}
taosHashCleanup(pVgroupHashmap);
return code;
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);

View File

@ -692,6 +692,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_TB, "Invalid table to cr
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_INTERVAL, "Invalid tsma interval, 1ms ~ 1h is allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_FUNC_PARAM, "Invalid tsma func param, only one column allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_UNSUPPORTED_FUNC, "Tsma func not supported")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_MUST_BE_DROPPED, "Tsma must be dropped first")
//rsma
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")

View File

@ -566,7 +566,7 @@ class TSMATestSQLGenerator:
class TDTestCase:
updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 10, 'ttlPushInterval': 5}
updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 1}
def __init__(self):
self.vgroups = 4
@ -751,7 +751,6 @@ class TDTestCase:
self.create_tsma('tsma5', 'test', 'norm_tb', [
'avg(c1)', 'avg(c2)'], '10m')
time.sleep(999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
self.test_recursive_tsma()
@ -1052,8 +1051,7 @@ class TDTestCase:
tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute(
'create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
self.create_tsma('tsma1', 'nsdb', 'meters', [
'avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
# drop column, drop tag
tdSql.error('alter table meters drop column c1', -2147482637)
tdSql.error('alter table meters drop tag t1', -2147482637)
@ -1066,8 +1064,29 @@ class TDTestCase:
tdSql.execute('alter table meters drop tag t3', queryTimes=1)
tdSql.execute('drop database nsdb')
# drop norm table
self.create_tsma('tsma_norm_tb', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '5m')
tdSql.error('drop table norm_tb', -2147471088)
# drop no tsma table
tdSql.execute('drop table t2, t1')
# test ttl drop table
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
tdSql.execute('alter table t0 ttl 2', queryTimes=1)
tdSql.execute('flush database test')
tdSql.waitedQuery('show tables like "%t0"', 0, 10)
# test drop multi tables
tdSql.execute('drop table t3, t4')
tdSql.waitedQuery('show tables like "%t3"', 0, 1)
tdSql.waitedQuery('show tables like "%t4"', 0, 1)
# TODO test drop stream
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
def test_create_tsma_on_stable(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')