support drop ctb for tsma
This commit is contained in:
parent
1a06dd684e
commit
1ef153de63
|
@ -4272,6 +4272,25 @@ typedef struct SStreamProgressRsp {
|
|||
int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp);
|
||||
int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp);
|
||||
|
||||
typedef struct SDropCtbWithTsmaSingleTbReq {
|
||||
SVDropTbReq req;
|
||||
bool isTsmaResTb;
|
||||
int64_t tsmaUid;
|
||||
int64_t stbUid; // stable uid
|
||||
} SMDropCtbWithTsmaSingleTbReq;
|
||||
|
||||
typedef struct SDropCtbWithTsmaSingleVgReq {
|
||||
SVgroupInfo vgInfo;
|
||||
SArray* pTbs;
|
||||
} SMDropCtbWithTsmaSingleVgReq;
|
||||
|
||||
typedef struct SDropCtbWithTsmaReq {
|
||||
SArray* pVgReqs;
|
||||
} SMDropTbWithTsmaReq;
|
||||
|
||||
int32_t tSerializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, const SMDropTbWithTsmaReq* pReq);
|
||||
int32_t tDeserializeDropCtbWithTsmaReq(void* buf, int32_t bufLen, SMDropTbWithTsmaReq* pReq);
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -3708,5 +3708,54 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
|
||||
int32_t code = -1;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SDbObj *pDb = NULL;
|
||||
SStbObj *pStb = NULL;
|
||||
SMDropTbWithTsmaReq dropReq = {0};
|
||||
bool locked = false;
|
||||
if (tDeserializeDropCtbWithTsmaReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
SHashObj* pTsmaHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
SHashObj* pSourceTbHashSet = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
||||
const SMDropCtbWithTsmaSingleVgReq* pVgReq = taosArrayGet(dropReq.pVgReqs, i);
|
||||
for (int32_t j = 0; j < pVgReq->pTbs->size; ++j) {
|
||||
const SMDropCtbWithTsmaSingleTbReq* pTbReq = taosArrayGet(pVgReq->pTbs, j);
|
||||
if (pTbReq->isTsmaResTb) {
|
||||
taosHashPut(pTsmaHashSet, &pTbReq->tsmaUid, sizeof(pTbReq->tsmaUid), NULL, 0);
|
||||
taosHashPut(pSourceTbHashSet, &pTbReq->req.suid, sizeof(pTbReq->req.suid), NULL, 0);
|
||||
} else {
|
||||
taosHashPut(pSourceTbHashSet, &pTbReq->stbUid, sizeof(pTbReq->stbUid), NULL, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
sdbReadLock(pMnode->pSdb, SDB_SMA);
|
||||
locked = true;
|
||||
|
||||
void *pIter = NULL;
|
||||
SSmaObj *pSma = NULL;
|
||||
|
||||
while(1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (!pIter) break;
|
||||
if (taosHashGet(pSourceTbHashSet, &pSma->stbUid, sizeof(pSma->stbUid))) {
|
||||
if (!taosHashGet(pTsmaHashSet, &pSma->uid, sizeof(pSma->uid))) {
|
||||
// TODO should retry
|
||||
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
|
||||
}
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
}
|
||||
// start transaction
|
||||
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
if (locked) sdbUnLock(pMnode->pSdb, SDB_SMA);
|
||||
taosHashCleanup(pTsmaHashSet);
|
||||
taosHashCleanup(pSourceTbHashSet);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2939,10 +2939,10 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
ctgRemoveTbMetaFromCache(pCtg, pTbName, false);
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
// TODO add tb meta to cache
|
||||
if (META_TYPE_BOTH_TABLE == pOut->metaType) {
|
||||
// rewrite tsma fetch table with it's super table name
|
||||
snprintf(pFetch->tsmaSourceTbName.tname, TMIN(TSDB_TABLE_NAME_LEN, strlen(pOut->tbName) + 1), "%s", pOut->tbName);
|
||||
sprintf(pFetch->tsmaSourceTbName.tname, "%s", pOut->tbName);
|
||||
}
|
||||
CTG_ERR_JRET(ctgGetTbTSMAFromMnode(pCtg, pConn, &pFetch->tsmaSourceTbName, NULL, tReq, TDMT_MND_GET_TABLE_TSMA));
|
||||
} break;
|
||||
|
|
|
@ -12271,19 +12271,17 @@ typedef struct SVgroupDropTableBatch {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupDropTableBatch;
|
||||
|
||||
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo,
|
||||
uint64_t suid) {
|
||||
SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists};
|
||||
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SVgroupInfo* pVgInfo, SVDropTbReq* pReq) {
|
||||
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
if (NULL == pTableBatch) {
|
||||
SVgroupDropTableBatch tBatch = {0};
|
||||
tBatch.info = *pVgInfo;
|
||||
tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
||||
taosArrayPush(tBatch.req.pArray, &req);
|
||||
taosArrayPush(tBatch.req.pArray, pReq);
|
||||
|
||||
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
|
||||
} else { // add to the correct vgroup
|
||||
taosArrayPush(pTableBatch->req.pArray, &req);
|
||||
taosArrayPush(pTableBatch->req.pArray, pReq);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12314,7 +12312,8 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
|
|||
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid);
|
||||
SVDropTbReq req = {.name = pClause->tableName, .suid = pTableMeta->suid, .igNotExists = pClause->ignoreNotExists};
|
||||
addDropTbReqIntoVgroup(pVgroupHashmap, &info, &req);
|
||||
}
|
||||
|
||||
over:
|
||||
|
@ -12388,7 +12387,7 @@ static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap,
|
|||
FOREACH(pNode, pStmt->pTables) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||
if (pClause->pTsmas) {
|
||||
for (int32_t i = 0; pClause->pTsmas->size; ++i) {
|
||||
for (int32_t i = 0; i < pClause->pTsmas->size; ++i) {
|
||||
const STableTSMAInfo* pTsma = taosArrayGetP(pClause->pTsmas, i);
|
||||
|
||||
int32_t len = sprintf(tsmaResTbName, "%s.%s", pTsma->dbFName, pTsma->name);
|
||||
|
@ -12396,19 +12395,22 @@ static int32_t dropTableAddTsmaResTb(STranslateContext* pCxt, SHashObj* pVgMap,
|
|||
sprintf(tsmaResTbName + len, "_%s", pClause->tableName);
|
||||
|
||||
toName(pCxt->pParseCxt->acctId, pClause->dbName, tsmaResTbName, &tbName);
|
||||
code = getTargetMeta(pCxt, &tbName, &pTableMeta, false);
|
||||
/*code = getTargetMeta(pCxt, &tbName, &pTableMeta, false);
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
continue;
|
||||
}
|
||||
if (code) break;
|
||||
if (code) break; */
|
||||
collectUseTable(&tbName, pCxt->pTargetTables);
|
||||
SVgroupInfo info = {0};
|
||||
bool exists = false;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info);
|
||||
//code = getTableHashVgroup(pCxt, pClause->dbName, tsmaResTbName, &info);
|
||||
code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &tbName, &info, &exists);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
addDropTbReqIntoVgroup(pVgMap, pClause, &info, pTableMeta->suid);
|
||||
if (TSDB_CODE_SUCCESS == code && exists) {
|
||||
SVDropTbReq req = {.name = tsmaResTbName, .suid = pTsma->destTbUid, .igNotExists = true};
|
||||
addDropTbReqIntoVgroup(pVgMap, &info, &req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue