fix drop ctb
This commit is contained in:
parent
34dbdfd4dc
commit
94f2f6ae22
|
@ -213,11 +213,13 @@ typedef struct SDropTableClause {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
bool ignoreNotExists;
|
||||
SArray* pTsmas;
|
||||
} SDropTableClause;
|
||||
|
||||
typedef struct SDropTableStmt {
|
||||
ENodeType type;
|
||||
SNodeList* pTables;
|
||||
bool withTsma;
|
||||
} SDropTableStmt;
|
||||
|
||||
typedef struct SDropSuperTableStmt {
|
||||
|
|
|
@ -2536,11 +2536,6 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
|||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pStream->smaId != 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pStream->targetStbUid == suid) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
|
|
@ -791,7 +791,9 @@ typedef struct SCtgCacheItemInfo {
|
|||
(CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || \
|
||||
(CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
|
||||
|
||||
#define CTG_IS_BATCH_TASK(_taskType) ((CTG_TASK_GET_TB_META_BATCH == (_taskType)) || (CTG_TASK_GET_TB_HASH_BATCH == (_taskType)) || (CTG_TASK_GET_VIEW == (_taskType)))
|
||||
#define CTG_IS_BATCH_TASK(_taskType) \
|
||||
((CTG_TASK_GET_TB_META_BATCH == (_taskType)) || (CTG_TASK_GET_TB_HASH_BATCH == (_taskType)) || \
|
||||
(CTG_TASK_GET_VIEW == (_taskType)) || (CTG_TASK_GET_TB_TSMA == (_taskType)))
|
||||
|
||||
#define CTG_GET_TASK_MSGCTX(_task, _id) \
|
||||
(CTG_IS_BATCH_TASK((_task)->type) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx)
|
||||
|
|
|
@ -2807,7 +2807,7 @@ static int32_t ctgTsmaFetchStreamProgress(SCtgTaskReq* tReq, SHashObj* pVgHash,
|
|||
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
||||
const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
|
||||
const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
|
||||
SVgroupInfo* pVgInfo = NULL;
|
||||
|
||||
pFetch->vgNum = taosHashGetSize(pVgHash);
|
||||
|
@ -2845,11 +2845,11 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
STableTSMAInfo* pTsma = NULL;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
||||
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
|
||||
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
|
||||
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||
|
||||
switch (reqType) {
|
||||
switch (reqType) {
|
||||
case TDMT_MND_GET_TABLE_TSMA: {
|
||||
STableTSMAInfoRsp* pOut = pMsgCtx->out;
|
||||
pFetch->fetchType = FETCH_TSMA_STREAM_PROGRESS;
|
||||
|
|
|
@ -1535,7 +1535,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, (char*)tbFName));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
|
||||
|
@ -1593,7 +1593,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
|
|||
if (!pOut) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, (char*)tbFName));
|
||||
|
||||
SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
|
||||
.requestId = pConn->requestId,
|
||||
|
|
|
@ -7944,9 +7944,25 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doTranslateDropCtbsWithTsma(STranslateContext* pCxt, SDropTableStmt* pStmt) {
|
||||
SNode* pNode;
|
||||
// note that there could have normal tables
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||
if (pClause->pTsmas) {
|
||||
// generate tsma res ctb names and get it's vgInfo
|
||||
}
|
||||
}
|
||||
// assemble all tbs into one req, then send to mnode
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)nodesListGetNode(pStmt->pTables, 0);
|
||||
SName tableName;
|
||||
if (pStmt->withTsma) {
|
||||
return doTranslateDropCtbsWithTsma(pCxt, pStmt);
|
||||
}
|
||||
return doTranslateDropSuperTable(
|
||||
pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), pClause->ignoreNotExists);
|
||||
}
|
||||
|
@ -12362,6 +12378,9 @@ SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) {
|
|||
|
||||
static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot;
|
||||
bool isSuperTable = false;
|
||||
SNode* pNode;
|
||||
SArray* pTsmas = NULL;
|
||||
|
||||
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pVgroupHashmap) {
|
||||
|
@ -12369,8 +12388,6 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
}
|
||||
|
||||
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
|
||||
bool isSuperTable = false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
@ -12387,6 +12404,21 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||
SName name;
|
||||
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
|
||||
getTableTsmasFromCache(pCxt->pMetaCache, &name, &pTsmas);
|
||||
if (pTsmas && pTsmas->size > 0) {
|
||||
pClause->pTsmas= pTsmas;
|
||||
pStmt->withTsma = true;
|
||||
}
|
||||
}
|
||||
if (pStmt->withTsma) {
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
if (NULL == pBufArray) {
|
||||
|
|
|
@ -6012,6 +6012,8 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
|||
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx);
|
||||
const STableTSMAInfo* pTsma = pUsefulTsma->pTsma;
|
||||
|
||||
if (pScanRange->ekey <= pScanRange->skey) return;
|
||||
|
||||
if (!pInterval) {
|
||||
tsmaOptInitIntervalFromTsma(&interval, pTsma, pTsmaOptCtx->precision);
|
||||
pInterval = &interval;
|
||||
|
|
Loading…
Reference in New Issue