enh: support drop table with uid
This commit is contained in:
parent
bb3479d952
commit
31bb993e0d
|
@ -96,6 +96,7 @@ typedef struct SCatalogReq {
|
|||
SArray* pView; // element is STablesReq
|
||||
SArray* pTableTSMAs; // element is STablesReq
|
||||
SArray* pTSMAs; // element is STablesReq
|
||||
SArray* pTableUid; // element is STablesReq
|
||||
bool qNodeRequired; // valid qnode
|
||||
bool dNodeRequired; // valid dnode
|
||||
bool svrVerRequired;
|
||||
|
|
|
@ -3708,6 +3708,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
|
||||
int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
@ -3718,6 +3719,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
|
||||
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
|
||||
taosMemoryFree(msg);
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ typedef enum {
|
|||
CTG_TASK_GET_VIEW,
|
||||
CTG_TASK_GET_TB_TSMA,
|
||||
CTG_TASK_GET_TSMA,
|
||||
CTG_TASK_GET_TB_UID,
|
||||
} CTG_TASK_TYPE;
|
||||
|
||||
typedef enum {
|
||||
|
@ -205,6 +206,13 @@ typedef struct SCtgTbMetasCtx {
|
|||
SArray* pFetchs;
|
||||
} SCtgTbMetasCtx;
|
||||
|
||||
typedef struct SCtgTbUidsCtx {
|
||||
int32_t fetchNum;
|
||||
SArray* pNames;
|
||||
SArray* pResList;
|
||||
SArray* pFetchs;
|
||||
} SCtgTbUidsCtx;
|
||||
|
||||
typedef struct SCtgTbIndexCtx {
|
||||
SName* pName;
|
||||
} SCtgTbIndexCtx;
|
||||
|
@ -421,6 +429,7 @@ typedef struct SCtgJob {
|
|||
int32_t viewNum;
|
||||
int32_t tbTsmaNum;
|
||||
int32_t tsmaNum; // currently, only 1 is possible
|
||||
int32_t tbUidNum;
|
||||
} SCtgJob;
|
||||
|
||||
typedef struct SCtgMsgCtx {
|
||||
|
@ -996,6 +1005,8 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq
|
|||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgGetTbUidsFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbUidsCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgCloneDbCfgInfo(void* pSrc, SDbCfgInfo** ppDst);
|
||||
|
||||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation* action);
|
||||
|
|
|
@ -588,6 +588,34 @@ int32_t ctgInitGetTSMATask(SCtgJob* pJob, int32_t taskId, void* param) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t ctgInitGetTbUidTask(SCtgJob* pJob, int32_t taskId, void* param) {
|
||||
SCtgTask task = {0};
|
||||
task.type = CTG_TASK_GET_TB_UID;
|
||||
task.taskId = taskId;
|
||||
task.pJob = pJob;
|
||||
|
||||
SCtgTbUidsCtx* pTaskCtx = taosMemoryCalloc(1, sizeof(SCtgTbTSMACtx));
|
||||
if (NULL == pTaskCtx) {
|
||||
CTG_ERR_RET(terrno);
|
||||
}
|
||||
task.taskCtx = pTaskCtx;
|
||||
pTaskCtx->pNames = param;
|
||||
pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes));
|
||||
if (NULL == pTaskCtx->pResList) {
|
||||
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbUidNum,
|
||||
(int32_t)sizeof(SMetaRes));
|
||||
ctgFreeTask(&task, true);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(pJob->pTasks, &task)) {
|
||||
ctgFreeTask(&task, true);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgHandleForceUpdateView(SCatalog* pCtg, const SCatalogReq* pReq) {
|
||||
int32_t viewNum = taosArrayGetSize(pReq->pView);
|
||||
for (int32_t i = 0; i < viewNum; ++i) {
|
||||
|
@ -822,9 +850,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
|
||||
int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs);
|
||||
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
|
||||
int32_t tbUidNum = (int32_t)taosArrayGetSize(pReq->pTableUid);
|
||||
|
||||
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum;
|
||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbUidNum;
|
||||
|
||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||
if (NULL == *job) {
|
||||
|
@ -859,6 +888,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
pJob->viewNum = viewNum;
|
||||
pJob->tbTsmaNum = tbTsmaNum;
|
||||
pJob->tsmaNum = tsmaNum;
|
||||
pJob->tbUidNum = tbUidNum;
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
pJob->pBatchs =
|
||||
|
@ -991,6 +1021,9 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
if (tsmaNum > 0) {
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TSMA, pReq->pTSMAs, NULL));
|
||||
}
|
||||
if(tbUidNum > 0) {
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_UID, pReq->pTableUid, NULL));
|
||||
}
|
||||
if (qnodeNum) {
|
||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
|
||||
}
|
||||
|
@ -1059,6 +1092,18 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpTbUidsRes(SCtgTask* pTask) {
|
||||
if (pTask->subTask) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
|
||||
pJob->jobRes.pTableMeta = pTask->res;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
||||
if (pTask->subTask) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1858,6 +1903,223 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
static int32_t ctgHandleGetTbUidsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
|
||||
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||
bool taskDone = false;
|
||||
|
||||
if (NULL == pMsgCtx) {
|
||||
ctgError("fail to get task msgCtx, taskType:%d", pTask->type);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
|
||||
if (NULL == pFetch) {
|
||||
ctgError("fail to get the %dth fetch, fetchNum:%d", tReq->msgIdx, (int32_t)taosArrayGetSize(ctx->pFetchs));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t flag = pFetch->flag;
|
||||
int32_t* vgId = &pFetch->vgId;
|
||||
SName* pName = NULL;
|
||||
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, pFetch, &pName));
|
||||
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||
|
||||
switch (reqType) {
|
||||
case TDMT_MND_USE_DB: {
|
||||
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
|
||||
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, pOut->dbVgroup, pName, &vgInfo));
|
||||
|
||||
ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
case TDMT_MND_TABLE_META: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
if (CTG_FLAG_IS_STB(flag)) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
(void)tNameGetFullDbName(pName, dbFName);
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, dbCache->vgCache.vgInfo, pName, &vgInfo));
|
||||
|
||||
ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
dbCache = NULL;
|
||||
} else {
|
||||
SBuildUseDBInput input = {0};
|
||||
|
||||
tstrncpy(input.db, dbFName, tListLen(input.db));
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
(void)ctgRemoveTbMetaFromCache(pCtg, pName, false); // cache update not fatal error
|
||||
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (pMsgCtx->lastOut) {
|
||||
TSWAP(pMsgCtx->out, pMsgCtx->lastOut);
|
||||
STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
TSWAP(pLastOut->tbMeta, pOut->tbMeta);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_META: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
(void)ctgRemoveTbMetaFromCache(pCtg, pName, false); // cache update not fatal error
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (CTG_FLAG_IS_STB(flag)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
|
||||
ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
|
||||
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
|
||||
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
int32_t exist = 0;
|
||||
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
|
||||
SName stbName = *pName;
|
||||
TAOS_STRCPY(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
STableMeta* stbMeta = NULL;
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta));
|
||||
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
||||
ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
exist = 1;
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
} else {
|
||||
ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == exist) {
|
||||
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
|
||||
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ctgTaskError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
(void)ctgUpdateTbMetaToCache(pCtg, pOut, false); // cache update not fatal error
|
||||
|
||||
if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
TAOS_MEMCPY(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
|
||||
/*
|
||||
else if (CTG_IS_META_CTABLE(pOut->metaType)) {
|
||||
SName stbName = *pName;
|
||||
TAOS_STRCPY(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
|
||||
if (NULL == pOut->tbMeta) {
|
||||
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
|
||||
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
TAOS_MEMCPY(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
*/
|
||||
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
if (NULL == pRes) {
|
||||
ctgTaskError("fail to get the %dth res in pResList, resNum:%d", pFetch->resIdx,
|
||||
(int32_t)taosArrayGetSize(ctx->pResList));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
pRes->code = 0;
|
||||
pRes->pRes = pOut->tbMeta;
|
||||
pOut->tbMeta = NULL;
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
if (NULL == pRes) {
|
||||
ctgTaskError("fail to get the %dth res in pResList, resNum:%d", pFetch->resIdx,
|
||||
(int32_t)taosArrayGetSize(ctx->pResList));
|
||||
} else {
|
||||
pRes->code = code;
|
||||
pRes->pRes = NULL;
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||
tstrerror(code));
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pTask->res && taskDone) {
|
||||
int32_t newCode = ctgHandleTaskEnd(pTask, code);
|
||||
if (newCode && TSDB_CODE_SUCCESS == code) {
|
||||
code = newCode;
|
||||
}
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
|
@ -3726,6 +3988,70 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgLaunchGetTbUidTask(SCtgTask* pTask) {
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgTbUidsCtx* pCtx = (SCtgTbUidsCtx*)pTask->taskCtx;
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
SName* pName = NULL;
|
||||
|
||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||
int32_t fetchIdx = 0;
|
||||
int32_t baseResIdx = 0;
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
STablesReq* pReq = TARRAY_GET_ELEM(pCtx->pNames, i);
|
||||
if (NULL == pReq) {
|
||||
ctgError("fail to get the %dth STablesReq, num:%d", i, dbNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
ctgDebug("start to check tbuid metas in db %s, tbNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbUidsFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
baseResIdx += taosArrayGetSize(pReq->pTables);
|
||||
}
|
||||
|
||||
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
|
||||
if (pCtx->fetchNum <= 0) {
|
||||
TSWAP(pTask->res, pCtx->pResList);
|
||||
|
||||
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pTask->msgCtxs = taosArrayInit_s(sizeof(SCtgMsgCtx), pCtx->fetchNum);
|
||||
if (NULL == pTask->msgCtxs) {
|
||||
ctgError("taosArrayInit_s %d SCtgMsgCtx %d failed", pCtx->fetchNum, (int32_t)sizeof(SCtgMsgCtx));
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
|
||||
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
|
||||
if (NULL == pFetch) {
|
||||
ctgError("fail to get the %dth fetch in pCtx->pFetchs, fetchNum:%d", i, pCtx->fetchNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgGetFetchName(pCtx->pNames, pFetch, &pName));
|
||||
|
||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
|
||||
if (NULL == pMsgCtx) {
|
||||
ctgError("fail to get the %dth pMsgCtx", i);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (NULL == pMsgCtx->pBatchs) {
|
||||
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||
}
|
||||
|
||||
SCtgTaskReq tReq;
|
||||
tReq.pTask = pTask;
|
||||
tReq.msgIdx = pFetch->fetchIdx;
|
||||
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpTbTSMARes(SCtgTask* pTask) {
|
||||
if (pTask->subTask) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -3872,6 +4198,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
|||
{ctgInitGetViewsTask, ctgLaunchGetViewsTask, ctgHandleGetViewsRsp, ctgDumpViewsRes, NULL, NULL},
|
||||
{ctgInitGetTbTSMATask, ctgLaunchGetTbTSMATask, ctgHandleGetTbTSMARsp, ctgDumpTbTSMARes, NULL, NULL},
|
||||
{ctgInitGetTSMATask, ctgLaunchGetTSMATask, ctgHandleGetTSMARsp, ctgDumpTSMARes, NULL, NULL},
|
||||
{ctgInitGetTbUidTask, ctgLaunchGetTbUidTask, ctgHandleGetTbMetasRsp, ctgDumpTbUidsRes, NULL, NULL},
|
||||
};
|
||||
|
||||
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
|
||||
|
|
|
@ -3691,6 +3691,39 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbUidsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbUidsCtx *ctx, int32_t dbIdx,
|
||||
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
|
||||
int32_t tbNum = taosArrayGetSize(pList);
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
int32_t flag = CTG_FLAG_UNKNOWN_STB;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
uint64_t lastSuid = 0;
|
||||
STableMeta *lastTableMeta = NULL;
|
||||
SName *pName = taosArrayGet(pList, 0);
|
||||
if (NULL == pName) {
|
||||
ctgError("fail to get the 0th SName from tableList, tableNum:%d", (int32_t)taosArrayGetSize(pList));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (IS_SYS_DBNAME(pName->dbname)) {
|
||||
CTG_FLAG_SET_SYS_DB(flag);
|
||||
TAOS_STRCPY(dbFName, pName->dbname);
|
||||
} else {
|
||||
(void)tNameGetFullDbName(pName, dbFName);
|
||||
}
|
||||
|
||||
ctgDebug("db %s not in cache", dbFName);
|
||||
for (int32_t i = 0; i < tbNum; ++i) {
|
||||
CTG_ERR_JRET(ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag));
|
||||
if (NULL == taosArrayPush(ctx->pResList, &(SMetaData){0})) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
|
||||
if (IS_SYS_DBNAME(pTableName->dbname)) {
|
||||
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
|
||||
|
|
|
@ -1406,6 +1406,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pScanBaseInfo->cond.twindows.skey = oldSkey;
|
||||
} else {
|
||||
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
@ -1424,6 +1425,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -184,6 +184,7 @@ int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char
|
|||
|
||||
if (pOperator == NULL) {
|
||||
qError("invalid operator, failed to find tableScanOperator %s", id);
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -112,8 +112,10 @@ typedef struct SParseMetaCache {
|
|||
SHashObj* pViews; // key is viewFName, element is SViewMeta*
|
||||
SHashObj* pTableTSMAs; // key is tbFName, elements are SArray<STableTSMAInfo*>
|
||||
SHashObj* pTSMAs; // key is tsmaFName, elemetns are STableTSMAInfo*
|
||||
SHashObj* pTableUid; // key is tbUid, elemetn is STableMeta*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
bool qnodeRequired;
|
||||
} SParseMetaCache;
|
||||
|
||||
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
||||
|
@ -141,6 +143,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveTableUidInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveViewMetaInCache(int32_t acctId, const char* pDb, const char* pView, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveViewMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
|
||||
|
|
|
@ -380,15 +380,20 @@ static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableS
|
|||
// char tableName[50] = "aa\u00bf\u200bstb0";
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
if (pStmt->withOpt) {
|
||||
code = reserveTableUidInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
} else {
|
||||
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code =
|
||||
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pClause->dbName,
|
||||
pClause->tableName, AUTH_TYPE_WRITE, pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pClause->dbName,
|
||||
pClause->tableName, AUTH_TYPE_WRITE, pCxt->pMetaCache);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
|
@ -398,6 +403,7 @@ static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableS
|
|||
}
|
||||
|
||||
static int32_t collectMetaKeyFromDropStable(SCollectMetaKeyCxt* pCxt, SDropSuperTableStmt* pStmt) {
|
||||
if (pStmt->withOpt) return TSDB_CODE_SUCCESS;
|
||||
return reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->tableName,
|
||||
AUTH_TYPE_WRITE, pCxt->pMetaCache);
|
||||
}
|
||||
|
|
|
@ -200,6 +200,7 @@ static int32_t findAndReplaceNode(SCalcConstContext* pCxt, SNode** pRoot, SNode*
|
|||
nodesRewriteExprPostOrder(pRoot, doFindAndReplaceNode, pCxt);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->code && strict && !pCxt->replaceCxt.replaced) {
|
||||
parserError("target replace node not found, %p", pTarget);
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
return pCxt->code;
|
||||
|
|
|
@ -175,6 +175,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
|
|||
} else {
|
||||
pSchema = pTableMeta->schema;
|
||||
if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2771,6 +2771,7 @@ static int32_t rewriteSystemInfoFunc(STranslateContext* pCxt, SNode** pNode) {
|
|||
default:
|
||||
break;
|
||||
}
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
@ -2977,6 +2978,7 @@ static int32_t rewriteClientPseudoColumnFunc(STranslateContext* pCxt, SNode** pN
|
|||
default:
|
||||
break;
|
||||
}
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -804,6 +804,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildTableReqFromDb(pMetaCache->pTSMAs, &pCatalogReq->pTSMAs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildTableReqFromDb(pMetaCache->pTableUid, &pCatalogReq->pTableUid);
|
||||
}
|
||||
#ifdef TD_ENTERPRISE
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildTableReqFromDb(pMetaCache->pTableMeta, &pCatalogReq->pView);
|
||||
|
@ -847,6 +850,7 @@ static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pD
|
|||
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput) {
|
||||
SMetaRes** pRes = taosHashGet(pHash, pKey, len);
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == (*pRes)->code) {
|
||||
|
@ -1019,6 +1023,10 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac
|
|||
return reserveTableReqInDbCache(pName->acctId, pName->dbname, pName->tname, &pMetaCache->pTableMeta);
|
||||
}
|
||||
|
||||
int32_t reserveTableUidInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
||||
return reserveTableReqInDbCache(acctId, pDb, pTable, &pMetaCache->pTableUid);
|
||||
}
|
||||
|
||||
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t code = tNameExtractFullName(pName, fullName);
|
||||
|
@ -1283,6 +1291,7 @@ int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, ST
|
|||
code = getMetaDataFromHash(tsmaFName, strlen(tsmaFName), pMetaCache->pTSMAs, (void**)&pTsmaRsp);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (!pTsmaRsp || pTsmaRsp->pTsmas->size != 1) {
|
||||
assert(0);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
*pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0);
|
||||
|
@ -1375,11 +1384,13 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
|
|||
destoryParseTablesMetaReqHash(pMetaCache->pTableVgroup);
|
||||
destoryParseTablesMetaReqHash(pMetaCache->pViews);
|
||||
destoryParseTablesMetaReqHash(pMetaCache->pTSMAs);
|
||||
destoryParseTablesMetaReqHash(pMetaCache->pTableUid);
|
||||
} else {
|
||||
taosHashCleanup(pMetaCache->pTableMeta);
|
||||
taosHashCleanup(pMetaCache->pTableVgroup);
|
||||
taosHashCleanup(pMetaCache->pViews);
|
||||
taosHashCleanup(pMetaCache->pTSMAs);
|
||||
taosHashCleanup(pMetaCache->pTableUid);
|
||||
}
|
||||
taosHashCleanup(pMetaCache->pDbVgroup);
|
||||
taosHashCleanup(pMetaCache->pDbCfg);
|
||||
|
|
Loading…
Reference in New Issue