diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6428bafa97..3100363f31 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4272,39 +4272,26 @@ typedef struct SStreamProgressRsp { int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp); int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp); -typedef struct SDropCtbWithTsmaSingleTbReq { - SVDropTbReq req; - bool isTsmaResTb; - int64_t tsmaUid; - int64_t stbUid; // stable uid -} SMDropCtbWithTsmaSingleTbReq; - typedef struct SDropCtbWithTsmaSingleVgReq { SVgroupInfo vgInfo; - SArray* pTbs; -} SMDropCtbWithTsmaSingleVgReq; + SArray* pTbs; // SVDropTbReq +} SMDropTbReqsOnSingleVg; -typedef struct SDropCtbWithTsmaReq { - SArray* pVgReqs; -} SMDropTbWithTsmaReq; +int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder* pEncoder, const SMDropTbReqsOnSingleVg* pReq); +int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq); -int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq); -int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq); +typedef struct SDropTbsReq { + SArray* pVgReqs; // SMDropTbReqsOnSingleVg +} SMDropTbsReq; -typedef struct SVTtlExpiredTb { - tb_uid_t uid; - char name[TSDB_TABLE_NAME_LEN]; - tb_uid_t suid; -} SVTtlExpiredTb; +int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq); +int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq); typedef struct SVFetchTtlExpiredTbsRsp { - SArray* pExpiredTbs; + SArray* pExpiredTbs; // SVDropTbReq int32_t vgId; } SVFetchTtlExpiredTbsRsp; -int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb); -int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb); - int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp); int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 83b2ed3f66..789462f444 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -332,7 +332,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ (_type) == TDMT_MND_DROP_STB || (_type) == TDMT_MND_CREATE_VIEW || (_type) == TDMT_MND_DROP_VIEW || \ - (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA) + (_type) == TDMT_MND_CREATE_TSMA || (_type) == TDMT_MND_DROP_TSMA || (_type) == TDMT_MND_DROP_TB_WITH_TSMA) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ea6a37f642..5328703b19 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -828,6 +828,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSMA_INVALID_INTERVAL TAOS_DEF_ERROR_CODE(0, 0x3107) #define TSDB_CODE_TSMA_INVALID_FUNC_PARAM TAOS_DEF_ERROR_CODE(0, 0x3108) #define TSDB_CODE_TSMA_UNSUPPORTED_FUNC TAOS_DEF_ERROR_CODE(0, 0x3109) +#define TSDB_CODE_TSMA_MUST_BE_DROPPED TAOS_DEF_ERROR_CODE(0, 0x3110) //rsma #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4636fdd0cf..1ea9bc33c0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8322,8 +8322,8 @@ static int32_t tDecodeSVDropTbRsp(SDecoder *pCoder, SVDropTbRsp *pReq) { } int32_t tEncodeSVDropTbBatchReq(SEncoder *pCoder, const SVDropTbBatchReq *pReq) { - int32_t nReqs = taosArrayGetSize(pReq->pArray); - SVDropTbReq *pDropTbReq; + int32_t nReqs = taosArrayGetSize(pReq->pArray); + SVDropTbReq *pDropTbReq; if (tStartEncode(pCoder) < 0) return -1; @@ -10303,20 +10303,77 @@ int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgres return 0; } -int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq){ return 0;} -int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq) { return 0;} - -int32_t tEncodeTtlExpiredTb(SEncoder* pEncoder, const SVTtlExpiredTb* pTb) { - if (tEncodeI64(pEncoder, pTb->uid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTb->name) < 0) return -1; - if (tEncodeI64(pEncoder, pTb->suid) < 0) return -1; +int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder *pEncoder, const SMDropTbReqsOnSingleVg *pReq) { + const SVgroupInfo* pVgInfo = &pReq->vgInfo; + if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; + if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; + if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1; + if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; + int32_t size = pReq->pTbs ? pReq->pTbs->size: 0; + if (tEncodeI32(pEncoder, size) < 0) return -1; + for (int32_t i = 0; i < size; ++i) { + const SVDropTbReq* pInfo = taosArrayGet(pReq->pTbs, i); + if (tEncodeSVDropTbReq(pEncoder, pInfo) < 0) return -1; + } return 0; } -int32_t tDecodeTtlExpiredTb(SDecoder* pDecoder, SVTtlExpiredTb* pTb) { - if (tDecodeI64(pDecoder, &pTb->uid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTb->name) < 0) return -1; - if (tDecodeI64(pDecoder, &pTb->suid) < 0) return -1; +int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq) { + if (tDecodeI32(pDecoder, &pReq->vgInfo.vgId) < 0) return -1; + if (tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin) < 0) return -1; + if (tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pReq->vgInfo.epSet) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->vgInfo.numOfTable) < 0) return -1; + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) return -1; + pReq->pTbs = taosArrayInit(size, sizeof(SVDropTbReq)); + if (!pReq->pTbs) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + SVDropTbReq pTbReq = {0}; + for (int32_t i = 0; i < size; ++i) { + if (tDecodeSVDropTbReq(pDecoder, &pTbReq) < 0) return -1; + taosArrayPush(pReq->pTbs, &pTbReq); + } + return 0; +} + +int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq){ + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + tStartEncode(&encoder); + int32_t size = pReq->pVgReqs ? pReq->pVgReqs->size : 0; + if (tEncodeI32(&encoder, size) < 0) return -1; + for (int32_t i = 0; i < size; ++i) { + SMDropTbReqsOnSingleVg* pVgReq = taosArrayGet(pReq->pVgReqs, i); + if (tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq) < 0) return -1; + } + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + tStartDecode(&decoder); + int32_t size = 0; + if (tDecodeI32(&decoder, &size) < 0) return -1; + pReq->pVgReqs = taosArrayInit(size, sizeof(SMDropTbReqsOnSingleVg)); + if (!pReq->pVgReqs) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + for (int32_t i = 0; i < size; ++i) { + SMDropTbReqsOnSingleVg vgReq = {0}; + tDecodeSMDropTbReqOnSingleVg(&decoder, &vgReq); + taosArrayPush(pReq->pVgReqs, &vgReq); + } + tEndDecode(&decoder); + tDecoderClear(&decoder); return 0; } @@ -10325,21 +10382,21 @@ int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredT int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0; if (tEncodeI32(pCoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - if (tEncodeTtlExpiredTb(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1; + if (tEncodeSVDropTbReq(pCoder, taosArrayGet(pRsp->pExpiredTbs, i)) < 0) return -1; } return 0; } int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp) { - if (tDecodeI32(pclose, &pRsp->vgId) < 0) return -1; + if (tDecodeI32(pCoder, &pRsp->vgId) < 0) return -1; int32_t size = 0; if (tDecodeI32(pCoder, &size) < 0) return -1; if (size > 0) { - pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVTtlExpiredTb)); + pRsp->pExpiredTbs = taosArrayInit(size, sizeof(SVDropTbReq)); if (!pRsp->pExpiredTbs) return TSDB_CODE_OUT_OF_MEMORY; - SVTtlExpiredTb tb; + SVDropTbReq tb = {0}; for (int32_t i = 0; i < size; ++i) { - if (tDecodeTtlExpiredTb(pCoder, &tb) < 0) return -1; + if (tDecodeSVDropTbReq(pCoder, &tb) < 0) return -1; taosArrayPush(pRsp->pExpiredTbs, &tb); } } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d33b5332de..09267d12b4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -171,6 +171,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 5569a6ec9f..8704286826 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -29,6 +29,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen); int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq); bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); +void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList); SSdbRaw *mndDbActionEncode(SDbObj *pDb); const char *mndGetDbStr(const char *src); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 2eed36d3d7..e34a5ee281 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1521,7 +1521,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) { return numOfTables; } -static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { +void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7003ee9e97..f85e38cf83 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -95,6 +95,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma); mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); @@ -3708,25 +3709,166 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) { return code; } -static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, int32_t vgId, SArray* pTbs) { - return 0; +typedef struct SVDropTbVgReqs { + SVDropTbBatchReq req; + SVgroupInfo info; +} SVDropTbVgReqs; + +typedef struct SMDropTbDbInfo { + SArray *dbVgInfos; + int32_t hashPrefix; + int32_t hashSuffix; + int32_t hashMethod; +} SMDropTbDbInfo; + +typedef struct SMDropTbTsmaInfo { + char tsmaResTbDbFName[TSDB_DB_FNAME_LEN]; + char tsmaResTbNamePrefix[TSDB_TABLE_NAME_LEN]; + int32_t suid; + SMDropTbDbInfo dbInfo; // reference to DbInfo in pDbMap +} SMDropTbTsmaInfo; + +typedef struct SMDropTbTsmaInfos { + SArray* pTsmaInfos; // SMDropTbTsmaInfo +} SMDropTbTsmaInfos; + +typedef struct SMndDropTbsWithTsmaCtx { + SHashObj* pTsmaMap; // + SHashObj* pDbMap; // + SHashObj* pVgMap; // + SArray* pResTbNames; // SArray +} SMndDropTbsWithTsmaCtx; + +static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId); + +static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) { + if (!p) return; + + if (p->pDbMap) { + void* pIter = taosHashIterate(p->pDbMap, NULL); + while (pIter) { + SMDropTbDbInfo* pInfo = pIter; + taosArrayDestroy(pInfo->dbVgInfos); + pIter = taosHashIterate(p->pDbMap, pIter); + } + taosHashCleanup(p->pDbMap); + } + if (p->pResTbNames) { + taosArrayDestroyP(p->pResTbNames, taosMemoryFree); + } + if (p->pTsmaMap) { + void* pIter = taosHashIterate(p->pTsmaMap, NULL); + while (pIter) { + SMDropTbTsmaInfos* pInfos = pIter; + taosArrayDestroy(pInfos->pTsmaInfos); + pIter = taosHashIterate(p->pTsmaMap, pIter); + } + taosHashCleanup(p->pTsmaMap); + } + + if (p->pVgMap) { + void* pIter = taosHashIterate(p->pVgMap, NULL); + while (pIter) { + SVDropTbVgReqs *pReqs = pIter; + taosArrayDestroy(pReqs->req.pArray); + pIter = taosHashIterate(p->pVgMap, pIter); + } + taosHashCleanup(p->pVgMap); + } + taosMemoryFree(p); } -static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp) { - int32_t code = -1; +static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx** ppCtx) { + int32_t code = 0; + SMndDropTbsWithTsmaCtx* pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx)); + if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY; + pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (!pCtx->pTsmaMap) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (!pCtx->pDbMap) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + + pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (!pCtx->pVgMap) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + *ppCtx = pCtx; +_end: + if (code) mndDestroyDropTbsWithTsmaCtx(pCtx); + return code; +} + + +static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, const SVDropTbBatchReq* pReq, int32_t *len) { + int32_t contLen = 0; + int32_t ret = 0; + SMsgHead *pHead = NULL; + SEncoder encoder = {0}; + + tEncodeSize(tEncodeSVDropTbBatchReq, pReq, contLen, ret); + if (ret < 0) return NULL; + + contLen += sizeof(SMsgHead); + pHead = taosMemoryMalloc(contLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgInfo->vgId); + + void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); + + tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead)); + tEncodeSVDropTbBatchReq(&encoder, pReq); + tEncoderClear(&encoder); + + *len = contLen; + return pHead; +} + +static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SVDropTbVgReqs* pVgReqs, void* pCont, int32_t contLen) { + STransAction action = {0}; + action.epSet = pVgReqs->info.epSet; + action.pCont = pCont; + action.contLen = contLen; + action.msgType = TDMT_VND_DROP_TABLE; + action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST; + return mndTransAppendRedoAction(pTrans, &action); +} + +static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) { SMnode *pMnode = pRsp->info.node; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs"); if (pTrans == NULL) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; - //if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; + void* pIter = taosHashIterate(pCtx->pVgMap, NULL); + while (pIter) { + const SVDropTbVgReqs* pVgReqs = pIter; + int32_t len = 0; + void* p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len); + if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) { + taosHashCancelIterate(pCtx->pVgMap, pIter); + goto _OVER; + } + pIter = taosHashIterate(pCtx->pVgMap, pIter); + } if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; - code = 0; _OVER: mndTransDrop(pTrans); - return code; + return terrno; } static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) { @@ -3734,164 +3876,195 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) { SMnode *pMnode = pReq->info.node; SDbObj *pDb = NULL; SStbObj *pStb = NULL; - SMDropTbWithTsmaReq dropReq = {0}; + SMDropTbsReq dropReq = {0}; bool locked = false; - if (tDeserializeDropCtbWithTsmaReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSMDropTbsReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } - SHashObj* pTsmaHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - SHashObj* pSourceTbHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SMndDropTbsWithTsmaCtx* pCtx = NULL; + terrno = mndInitDropTbsWithTsmaCtx(&pCtx); + if (terrno) goto _OVER; for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) { - const SMDropCtbWithTsmaSingleVgReq* pVgReq = taosArrayGet(dropReq.pVgReqs, i); - for (int32_t j = 0; j < pVgReq->pTbs->size; ++j) { - const SMDropCtbWithTsmaSingleTbReq* pTbReq = taosArrayGet(pVgReq->pTbs, j); - if (pTbReq->isTsmaResTb) { - taosHashPut(pTsmaHashSet, &pTbReq->tsmaUid, sizeof(pTbReq->tsmaUid), NULL, 0); - taosHashPut(pSourceTbHashSet, &pTbReq->req.suid, sizeof(pTbReq->req.suid), NULL, 0); - } else { - taosHashPut(pSourceTbHashSet, &pTbReq->stbUid, sizeof(pTbReq->stbUid), NULL, 0); - } - } + SMDropTbReqsOnSingleVg* pReq = taosArrayGet(dropReq.pVgReqs, i); + terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId); + if (terrno) goto _OVER; } - sdbReadLock(pMnode->pSdb, SDB_SMA); - locked = true; - - void *pIter = NULL; - SSmaObj *pSma = NULL; - - while(1) { - pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); - if (!pIter) break; - if (taosHashGet(pSourceTbHashSet, &pSma->stbUid, sizeof(pSma->stbUid))) { - if (!taosHashGet(pTsmaHashSet, &pSma->uid, sizeof(pSma->uid))) { - // TODO should retry - terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; - } - } - sdbRelease(pMnode->pSdb, pSma); - } - // start transaction - - - code = 0; + if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) + code = 0; _OVER: - if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA); - taosHashCleanup(pTsmaHashSet); - taosHashCleanup(pSourceTbHashSet); + if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); return code; } -typedef struct SDropTbVgReqs { - SVDropTbBatchReq req; - SVgroupInfo info; -} SDropTbVgReqs; +static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid, + bool ignoreNotExists) { + SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists}; -static int32_t mndDropTbAdd(SMnode* pMnode, SHashObj* pVgHashMap, const SVgObj* pVgObj, SVDropTbReq* pReq, char* name, tb_uid_t suid) { - SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = true}; - SVgroupInfo info = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .vgId = pVgObj->vgId}; - info.epSet = mndGetVgroupEpset(pMnode, pVgObj); - - SDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId)); - SDropTbVgReqs reqs = {0}; + SVDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); + SVDropTbVgReqs reqs = {0}; if (pReqs == NULL) { - reqs.info = info; + reqs.info = *pVgInfo; reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); - taosArrayPush(reqs.req.pArray, pReq); - taosHashPut(pVgHashMap, &pVgObj->vgId, sizeof(pVgObj->vgId), &reqs, sizeof(reqs)); + taosArrayPush(reqs.req.pArray, &req); + taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs)); } else { - taosArrayPush(pReqs->req.pArray, pReq); + taosArrayPush(pReqs->req.pArray, &req); } return 0; } +static int32_t mndGetDbVgInfoForTsma(SMnode* pMnode, const char* dbname, SMDropTbTsmaInfo* pInfo) { + int32_t code = 0; + SDbObj* pDb = mndAcquireDb(pMnode, dbname); + if (!pDb) { + code = TSDB_CODE_MND_DB_NOT_EXIST; + goto _end; + } + + pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if ( !pInfo->dbInfo.dbVgInfos) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos); + + pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix; + pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix; + pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod; + +_end: + if (pDb) mndReleaseDb(pMnode, pDb); + if (code && pInfo->dbInfo.dbVgInfos) { + taosArrayDestroy(pInfo->dbInfo.dbVgInfos); + pInfo->dbInfo.dbVgInfos = NULL; + } + return code; +} + +int32_t vgHashValCmp(const void* lp, const void* rp) { + uint32_t* key = (uint32_t*)lp; + SVgroupInfo* pVg = (SVgroupInfo*)rp; + + if (*key < pVg->hashBegin) { + return -1; + } else if (*key > pVg->hashEnd) { + return 1; + } + + return 0; +} + +static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId) { + int32_t code = 0; + + SVgObj* pVgObj = mndAcquireVgroup(pMnode, vgId); + if (!pVgObj) { + code = 0; + goto _end; + } + SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .numOfTable = pVgObj->numOfTables, .vgId = pVgObj->vgId}; + vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + // get all stb uids + for (int32_t i = 0; i < pTbs->size; ++i) { + const SVDropTbReq* pTb = taosArrayGet(pTbs, i); + if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) { + + } else { + SMDropTbTsmaInfos infos = {0}; + infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo)); + if (!infos.pTsmaInfos) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos)); + } + } + + void *pIter = NULL; + SSmaObj *pSma = NULL; + char buf[TSDB_TABLE_FNAME_LEN] = {0}; + // get used tsmas and it's dbs + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); + if (!pIter) break; + SMDropTbTsmaInfos* pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid)); + if (pInfos) { + SMDropTbTsmaInfo info = {0}; + int32_t len = sprintf(buf, "%s", pSma->name); + len = taosCreateMD5Hash(buf, len); + sprintf(info.tsmaResTbDbFName, "%s", pSma->db); + sprintf(info.tsmaResTbNamePrefix, "%s", buf); + SMDropTbDbInfo* pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN); + info.suid = pSma->dstTbUid; + if (!pDbInfo) { + code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info); + if (code != TSDB_CODE_SUCCESS) { + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pSma); + goto _end; + } + taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo)); + } else { + info.dbInfo = *pDbInfo; + } + taosArrayPush(pInfos->pTsmaInfos, &info); + } + sdbRelease(pMnode->pSdb, pSma); + } + + // generate vg req map + for (int32_t i = 0; i < pTbs->size; ++i) { + SVDropTbReq* pTb = taosArrayGet(pTbs, i); + mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists); + + SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid)); + SArray *pVgInfos = NULL; + char buf[TSDB_TABLE_FNAME_LEN]; + for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) { + SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j); + int32_t len = sprintf(buf, "%s.%s_%s", pInfo->tsmaResTbDbFName, pInfo->tsmaResTbNamePrefix, pTb->name); + uint32_t hashVal = + taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix); + const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ); + void* p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN); + taosArrayPush(pCtx->pResTbNames, &p); + mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true); + } + } +_end: + return code; +} + static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) { int32_t code = -1; SDecoder decoder = {0}; SMnode *pMnode = pRsp->info.node; - bool locked = false; - SHashObj* pTsmaMap = NULL; - SHashObj* pVgroupMap = NULL; - SHashObj* pDbMap = NULL; - SVFetchTtlExpiredTbsRsp rsp; + SVFetchTtlExpiredTbsRsp rsp = {0}; + SMndDropTbsWithTsmaCtx *pCtx = NULL; if (pRsp->code != TSDB_CODE_SUCCESS) goto _end; + if (pRsp->contLen == 0) { + code = 0; + goto _end; + } tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen); terrno = tDecodeVFetchTtlExpiredTbsRsp(&decoder, &rsp); if (terrno) goto _end; - pTsmaMap = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (!pTsmaMap) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } + terrno = mndInitDropTbsWithTsmaCtx(&pCtx); + if (terrno) goto _end; - - // get all stb uids - for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) { - const SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i); - if (taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid))) { - - } else { - SArray* pTsmas = taosArrayInit(2, TSDB_TABLE_NAME_LEN); - if (!pTsmas) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - taosHashPut(pTsmaMap, &pTb->suid, sizeof(pTb->suid), &pTsmas, sizeof(pTsmas)); - } - } - - sdbReadLock(pMnode->pSdb, SDB_SMA); - locked = true; - - void *pIter = NULL; - SSmaObj *pSma = NULL; - char buf[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1] = {0}; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); - if (!pIter) break; - SArray* pTsmas = taosHashGet(pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid)); - if (pTsmas) { - int32_t len = sprintf(buf, "%s.%s", pSma->db, pSma->name); - len = taosCreateMD5Hash(buf, len); - taosArrayPush(pTsmas, buf); - } - sdbRelease(pMnode->pSdb, pSma); - } - - - pVgroupMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (!pVgroupMap) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - SVgObj* pVgObj = mndAcquireVgroup(pMnode, rsp.vgId); - if (!pVgObj) { + terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId); + if (terrno) goto _end; + if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0; - goto _end; - } - SVDropTbReq req = {.igNotExists = true}; - for (int32_t i = 0; i < rsp.pExpiredTbs->size; ++i) { - SVTtlExpiredTb* pTb = taosArrayGet(rsp.pExpiredTbs, i); - req.name = pTb->name; - req.suid = pTb->suid; - mndDropTbAdd(pMnode, pVgroupMap, pVgObj, &req, pTb->name, pTb->suid); - - SArray* pTsmas = taosHashGet(pTsmaMap, &pTb->suid, sizeof(pTb->suid)); - for (int32_t j = 0; j > pTsmas->size; ++j) { - char* name = taosArrayGet(pTsmas, j); - sprintf(name + strlen(name), "_%s", pTb->name); - } - } - mndReleaseVgroup(pMnode, pVgObj); - if (terrno == 0) code = 0; _end: + if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); tDecoderClear(&decoder); tFreeFetchTtlExpiredTbsRsp(&rsp); - if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA); - - //pVgroupHashmap - //pTsmaMap return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index abd7759b93..5d8b587d30 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -937,6 +937,7 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* SVDropTtlTableReq ttlReq = {0}; SVFetchTtlExpiredTbsRsp rsp = {0}; SEncoder encoder = {0}; + SArray* pNames = NULL; pRsp->msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP; pRsp->code = TSDB_CODE_SUCCESS; pRsp->pCont = NULL; @@ -951,19 +952,26 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* tb_uid_t suid; char ctbName[TSDB_TABLE_NAME_LEN]; - SVTtlExpiredTb expiredTb = {0}; + SVDropTbReq expiredTb = {.igNotExists = true}; metaReaderDoInit(&mr, pVnode->pMeta, 0); rsp.vgId = TD_VID(pVnode); - rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVTtlExpiredTb)); + rsp.pExpiredTbs = taosArrayInit(ttlReq.nUids, sizeof(SVDropTbReq)); if (!rsp.pExpiredTbs) goto _end; + pNames = taosArrayInit(ttlReq.nUids, TSDB_TABLE_NAME_LEN); + if (!pNames) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + char buf[TSDB_TABLE_NAME_LEN]; for (int32_t i = 0; i < ttlReq.nUids; ++i) { tb_uid_t* uid = taosArrayGet(ttlReq.pTbUids, i); - expiredTb.uid = *uid; expiredTb.suid = *uid; terrno = metaReaderGetTableEntryByUid(&mr, *uid); if (terrno < 0) goto _end; - strncpy(expiredTb.name, mr.me.name, TSDB_TABLE_NAME_LEN); + strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN); + void* p = taosArrayPush(pNames, buf); + expiredTb.name = p; if (mr.me.type == TSDB_CHILD_TABLE) { expiredTb.suid = mr.me.ctbEntry.suid; } @@ -986,6 +994,8 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode* pVnode, int64_t ver, void* _end: metaReaderClear(&mr); tFreeFetchTtlExpiredTbsRsp(&rsp); + taosArrayDestroy(ttlReq.pTbUids); + if (pNames) taosArrayDestroy(pNames); pRsp->code = terrno; return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 72798ec20f..d09ef1a3f5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7947,16 +7947,9 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableStmt* pStmt) { SNode* pNode; // note that there could have normal tables - FOREACH(pNode, pStmt->pTables) { - SDropTableClause* pClause = (SDropTableClause*)pNode; - if (pClause->pTsmas) { - for (int32_t i = 0; i < pClause->pTsmas->size; ++i) { - } - // generate tsma res ctb names and get it's vgInfo - } - } - // assemble all tbs into one req, then send to mnode + + return TSDB_CODE_SUCCESS; } @@ -12285,18 +12278,16 @@ static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SVgroupInfo* pVgInf } } -static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, bool* pIsSuperTable, - SHashObj* pVgroupHashmap) { - SName name; - toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name); +static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, const SName* name, + int8_t* tableType, SHashObj* pVgroupHashmap) { STableMeta* pTableMeta = NULL; - int32_t code = getTargetMeta(pCxt, &name, &pTableMeta, false); + int32_t code = getTargetMeta(pCxt, name, &pTableMeta, false); if (TSDB_CODE_SUCCESS == code) { - code = collectUseTable(&name, pCxt->pTargetTables); + code = collectUseTable(name, pCxt->pTargetTables); } + *tableType = pTableMeta->tableType; if (TSDB_CODE_SUCCESS == code && TSDB_SUPER_TABLE == pTableMeta->tableType) { - *pIsSuperTable = true; goto over; } @@ -12305,14 +12296,14 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl goto over; } - *pIsSuperTable = false; SVgroupInfo info = {0}; if (TSDB_CODE_SUCCESS == code) { code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - SVDropTbReq req = {.name = pClause->tableName, .suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists}; + SVDropTbReq req = {.suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists}; + req.name = pClause->tableName; addDropTbReqIntoVgroup(pVgroupHashmap, &info, &req); } @@ -12378,50 +12369,9 @@ SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) { return pBufArray; } -static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap, SDropTableStmt* pStmt) { - char tsmaResTbName[TSDB_TABLE_NAME_LEN + TSDB_DB_FNAME_LEN + 1]; - SName tbName = {0}; - SNode* pNode; - int32_t code = 0; - STableMeta* pTableMeta = NULL; - FOREACH(pNode, pStmt->pTables) { - SDropTableClause* pClause = (SDropTableClause*)pNode; - if (pClause->pTsmas) { - for (int32_t i = 0; i < pClause->pTsmas->size; ++i) { - const STableTSMAInfo* pTsma = taosArrayGetP(pClause->pTsmas, i); - - int32_t len = sprintf(tsmaResTbName, "%s.%s", pTsma->dbFName, pTsma->name); - len = taosCreateMD5Hash(tsmaResTbName, len); - sprintf(tsmaResTbName + len, "_%s", pClause->tableName); - - toName(pCxt->pParseCxt->acctId, pClause->dbName, tsmaResTbName, &tbName); - /*code = getTargetMeta(pCxt, &tbName, &pTableMeta, false); - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - code = TSDB_CODE_SUCCESS; - continue; - } - if (code) break; */ - collectUseTable(&tbName, pCxt->pTargetTables); - SVgroupInfo info = {0}; - bool exists = false; - if (TSDB_CODE_SUCCESS == code) { - //code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info); - code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tbName, &info, &exists); - } - if (TSDB_CODE_SUCCESS == code && exists) { - SVDropTbReq req = {.name = tsmaResTbName, .suid = pTsma->destTbUid, .igNotExists = true}; - addDropTbReqIntoVgroup(pVgMap, &info, &req); - } - } - } - if (code) break; - } - return code; -} - static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot; - bool isSuperTable = false; + int8_t tableType; SNode* pNode; SArray* pTsmas = NULL; @@ -12432,33 +12382,61 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); FOREACH(pNode, pStmt->pTables) { - int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap); + SDropTableClause* pClause = (SDropTableClause*)pNode; + SName name; + toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name); + int32_t code = buildDropTableVgroupHashmap(pCxt, pClause, &name, &tableType, pVgroupHashmap); if (TSDB_CODE_SUCCESS != code) { taosHashCleanup(pVgroupHashmap); return code; } - if (isSuperTable && LIST_LENGTH(pStmt->pTables) > 1) { + if (tableType == TSDB_SUPER_TABLE && LIST_LENGTH(pStmt->pTables) > 1) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_STABLE); } + code = getTableTsmasFromCache(pCxt->pMetaCache, &name, &pTsmas); + if (TSDB_CODE_SUCCESS != code) { + taosHashCleanup(pVgroupHashmap); + return code; + } + if (!pStmt->withTsma) { + pStmt->withTsma = pTsmas && pTsmas->size > 0; + } + pClause->pTsmas = pTsmas; + if (tableType == TSDB_NORMAL_TABLE && pStmt->withTsma) { + taosHashCleanup(pVgroupHashmap); + return TSDB_CODE_TSMA_MUST_BE_DROPPED; + } } - if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) { + if (tableType == TSDB_SUPER_TABLE || 0 == taosHashGetSize(pVgroupHashmap)) { taosHashCleanup(pVgroupHashmap); return TSDB_CODE_SUCCESS; } - FOREACH(pNode, pStmt->pTables) { - SDropTableClause* pClause = (SDropTableClause*)pNode; - SName name; - toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name); - getTableTsmasFromCache(pCxt->pMetaCache, &name, &pTsmas); - if (pTsmas && pTsmas->size > 0) { - pClause->pTsmas= pTsmas; - pStmt->withTsma = true; - } - } + int32_t code = 0; if (pStmt->withTsma) { - dropTableAddTsmaResTb(pCxt, pVgroupHashmap, pStmt); + if (code == TSDB_CODE_SUCCESS) { + SMDropTbsReq req = {0}; + req.pVgReqs = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(SMDropTbReqsOnSingleVg)); + + SVgroupDropTableBatch* pTbBatch = NULL; + do { + pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); + if (pTbBatch == NULL) { + break; + } + + SMDropTbReqsOnSingleVg reqOnVg = {0}; + reqOnVg.vgInfo = pTbBatch->info; + reqOnVg.pTbs = pTbBatch->req.pArray; + taosArrayPush(req.pVgReqs, &reqOnVg); + } while (true); + + code = buildCmdMsg(pCxt, TDMT_MND_DROP_TB_WITH_TSMA, (FSerializeFunc)tSerializeSMDropTbsReq, &req); + taosArrayDestroy(req.pVgReqs); + } + taosHashCleanup(pVgroupHashmap); + return code; } SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d7ce6becf5..2fc7c07c49 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -692,6 +692,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_TB, "Invalid table to cr TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_INTERVAL, "Invalid tsma interval, 1ms ~ 1h is allowed") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_FUNC_PARAM, "Invalid tsma func param, only one column allowed") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_UNSUPPORTED_FUNC, "Tsma func not supported") +TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_MUST_BE_DROPPED, "Tsma must be dropped first") //rsma TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index b6c92fca72..924555997b 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -566,7 +566,7 @@ class TSMATestSQLGenerator: class TDTestCase: - updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 10, 'ttlPushInterval': 5} + updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 1} def __init__(self): self.vgroups = 4 @@ -751,7 +751,6 @@ class TDTestCase: self.create_tsma('tsma5', 'test', 'norm_tb', [ 'avg(c1)', 'avg(c2)'], '10m') - time.sleep(999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() self.test_recursive_tsma() @@ -1052,8 +1051,7 @@ class TDTestCase: tdSql.execute('use nsdb', queryTimes=1) tdSql.execute( 'create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) - self.create_tsma('tsma1', 'nsdb', 'meters', [ - 'avg(c1)', 'avg(c2)'], '5m') + self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m') # drop column, drop tag tdSql.error('alter table meters drop column c1', -2147482637) tdSql.error('alter table meters drop tag t1', -2147482637) @@ -1066,8 +1064,29 @@ class TDTestCase: tdSql.execute('alter table meters drop tag t3', queryTimes=1) tdSql.execute('drop database nsdb') + # drop norm table + self.create_tsma('tsma_norm_tb', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '5m') + tdSql.error('drop table norm_tb', -2147471088) + + # drop no tsma table + tdSql.execute('drop table t2, t1') + + # test ttl drop table + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + tdSql.execute('alter table t0 ttl 2', queryTimes=1) + tdSql.execute('flush database test') + tdSql.waitedQuery('show tables like "%t0"', 0, 10) + + # test drop multi tables + tdSql.execute('drop table t3, t4') + tdSql.waitedQuery('show tables like "%t3"', 0, 1) + tdSql.waitedQuery('show tables like "%t4"', 0, 1) + # TODO test drop stream + tdSql.execute('drop database test', queryTimes=1) + self.init_data() + def test_create_tsma_on_stable(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------')