From 1ef153de6381d43cb900526d1f99ed7a02ad558a Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 13 Mar 2024 11:35:16 +0000 Subject: [PATCH] support drop ctb for tsma --- include/common/tmsg.h | 19 ++++++++++ source/dnode/mnode/impl/src/mndStb.c | 49 ++++++++++++++++++++++++++ source/libs/catalog/src/ctgAsync.c | 4 +-- source/libs/parser/src/parTranslater.c | 26 +++++++------- 4 files changed, 84 insertions(+), 14 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a88d6287ed..538cd66940 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4272,6 +4272,25 @@ typedef struct SStreamProgressRsp { int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp); int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp); +typedef struct SDropCtbWithTsmaSingleTbReq { + SVDropTbReq req; + bool isTsmaResTb; + int64_t tsmaUid; + int64_t stbUid; // stable uid +} SMDropCtbWithTsmaSingleTbReq; + +typedef struct SDropCtbWithTsmaSingleVgReq { + SVgroupInfo vgInfo; + SArray* pTbs; +} SMDropCtbWithTsmaSingleVgReq; + +typedef struct SDropCtbWithTsmaReq { + SArray* pVgReqs; +} SMDropTbWithTsmaReq; + +int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq); +int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq); + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3482806cf8..7eaeb40c53 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3708,5 +3708,54 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) { } static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) { + int32_t code = -1; + SMnode *pMnode = pReq->info.node; + SDbObj *pDb = NULL; + SStbObj *pStb = NULL; + SMDropTbWithTsmaReq dropReq = {0}; + bool locked = false; + if (tDeserializeDropCtbWithTsmaReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + SHashObj* pTsmaHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + SHashObj* pSourceTbHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) { + const SMDropCtbWithTsmaSingleVgReq* pVgReq = taosArrayGet(dropReq.pVgReqs, i); + for (int32_t j = 0; j < pVgReq->pTbs->size; ++j) { + const SMDropCtbWithTsmaSingleTbReq* pTbReq = taosArrayGet(pVgReq->pTbs, j); + if (pTbReq->isTsmaResTb) { + taosHashPut(pTsmaHashSet, &pTbReq->tsmaUid, sizeof(pTbReq->tsmaUid), NULL, 0); + taosHashPut(pSourceTbHashSet, &pTbReq->req.suid, sizeof(pTbReq->req.suid), NULL, 0); + } else { + taosHashPut(pSourceTbHashSet, &pTbReq->stbUid, sizeof(pTbReq->stbUid), NULL, 0); + } + } + } + sdbReadLock(pMnode->pSdb, SDB_SMA); + locked = true; + void *pIter = NULL; + SSmaObj *pSma = NULL; + + while(1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); + if (!pIter) break; + if (taosHashGet(pSourceTbHashSet, &pSma->stbUid, sizeof(pSma->stbUid))) { + if (!taosHashGet(pTsmaHashSet, &pSma->uid, sizeof(pSma->uid))) { + // TODO should retry + terrno = TSDB_CODE_TDB_STB_NOT_EXIST; + } + } + sdbRelease(pMnode->pSdb, pSma); + } + // start transaction + + + code = 0; +_OVER: + if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA); + taosHashCleanup(pTsmaHashSet); + taosHashCleanup(pSourceTbHashSet); + return code; } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 6444286ab1..809c9c0f8c 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2939,10 +2939,10 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ctgRemoveTbMetaFromCache(pCtg, pTbName, false); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } - + // TODO add tb meta to cache if (META_TYPE_BOTH_TABLE == pOut->metaType) { // rewrite tsma fetch table with it's super table name - snprintf(pFetch->tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(pOut->tbName) + 1), "%s", pOut->tbName); + sprintf(pFetch->tsmaSourceTbName.tname, "%s", pOut->tbName); } CTG_ERR_JRET(ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, tReq, TDMT_MND_GET_TABLE_TSMA)); } break; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index aad37c58e7..72798ec20f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12271,19 +12271,17 @@ typedef struct SVgroupDropTableBatch { char dbName[TSDB_DB_NAME_LEN]; } SVgroupDropTableBatch; -static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo, - uint64_t suid) { - SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists}; +static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SVgroupInfo* pVgInfo, SVDropTbReq* pReq) { SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); if (NULL == pTableBatch) { SVgroupDropTableBatch tBatch = {0}; tBatch.info = *pVgInfo; tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); - taosArrayPush(tBatch.req.pArray, &req); + taosArrayPush(tBatch.req.pArray, pReq); taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch)); } else { // add to the correct vgroup - taosArrayPush(pTableBatch->req.pArray, &req); + taosArrayPush(pTableBatch->req.pArray, pReq); } } @@ -12314,7 +12312,8 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid); + SVDropTbReq req = {.name = pClause->tableName, .suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists}; + addDropTbReqIntoVgroup(pVgroupHashmap, &info, &req); } over: @@ -12388,7 +12387,7 @@ static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap, FOREACH(pNode, pStmt->pTables) { SDropTableClause* pClause = (SDropTableClause*)pNode; if (pClause->pTsmas) { - for (int32_t i = 0; pClause->pTsmas->size; ++i) { + for (int32_t i = 0; i < pClause->pTsmas->size; ++i) { const STableTSMAInfo* pTsma = taosArrayGetP(pClause->pTsmas, i); int32_t len = sprintf(tsmaResTbName, "%s.%s", pTsma->dbFName, pTsma->name); @@ -12396,19 +12395,22 @@ static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap, sprintf(tsmaResTbName + len, "_%s", pClause->tableName); toName(pCxt->pParseCxt->acctId, pClause->dbName, tsmaResTbName, &tbName); - code = getTargetMeta(pCxt, &tbName, &pTableMeta, false); + /*code = getTargetMeta(pCxt, &tbName, &pTableMeta, false); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { code = TSDB_CODE_SUCCESS; continue; } - if (code) break; + if (code) break; */ collectUseTable(&tbName, pCxt->pTargetTables); SVgroupInfo info = {0}; + bool exists = false; if (TSDB_CODE_SUCCESS == code) { - code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info); + //code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info); + code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tbName, &info, &exists); } - if (TSDB_CODE_SUCCESS == code) { - addDropTbReqIntoVgroup(pVgMap, pClause, &info, pTableMeta->suid); + if (TSDB_CODE_SUCCESS == code && exists) { + SVDropTbReq req = {.name = tsmaResTbName, .suid = pTsma->destTbUid, .igNotExists = true}; + addDropTbReqIntoVgroup(pVgMap, &info, &req); } } }