enh: support drop table with uid
This commit is contained in:
parent
302c08331f
commit
d7fcf21f8b
|
@ -309,6 +309,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_ARB_HEARTBEAT, "vnode-arb-hb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_VND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8
|
||||
|
|
|
@ -382,6 +382,8 @@ TDMT_VND_ARB_CHECK_SYNC = 613
|
|||
TDMT_VND_ARB_CHECK_SYNC_RSP = 614
|
||||
TDMT_VND_FETCH_TTL_EXPIRED_TBS = 615
|
||||
TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP = 616
|
||||
TDMT_VND_TABLE_NAME = 617
|
||||
TDMT_VND_TABLE_NAME_RSP = 618
|
||||
TDMT_SCH_QUERY = 769
|
||||
TDMT_SCH_QUERY_RSP = 770
|
||||
TDMT_SCH_MERGE_QUERY = 771
|
||||
|
|
|
@ -992,6 +992,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -418,6 +418,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TABLE_NAME:
|
||||
// error code has been set into reqMsg, no need to handle it here.
|
||||
if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
|
||||
qWarn("vnodeGetBatchName failed, msgType:%d", req->msgType);
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
// error code has been set into reqMsg, no need to handle it here.
|
||||
if (TSDB_CODE_SUCCESS != vnodeGetTableCfg(pVnode, &reqMsg, false)) {
|
||||
|
|
|
@ -786,7 +786,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TABLE_NAME) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
|
@ -807,6 +807,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
case TDMT_SCH_QUERY_HEARTBEAT:
|
||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_VND_TABLE_META:
|
||||
case TDMT_VND_TABLE_NAME:
|
||||
return vnodeGetTableMeta(pVnode, pMsg, true);
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
return vnodeGetTableCfg(pVnode, pMsg, true);
|
||||
|
|
|
@ -1006,8 +1006,8 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq
|
|||
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgGetTbUidsFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbNamesCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgGetTbNamesFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbNamesCtx* ctx, int32_t dbIdx,
|
||||
int32_t* fetchIdx, int32_t baseResIdx, SArray* pList);
|
||||
int32_t ctgCloneDbCfgInfo(void* pSrc, SDbCfgInfo** ppDst);
|
||||
|
||||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation* action);
|
||||
|
|
|
@ -1929,6 +1929,7 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
SName* pName = NULL;
|
||||
CTG_ERR_JRET(ctgGetFetchName(ctx->pNames, pFetch, &pName));
|
||||
|
||||
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||
|
||||
switch (reqType) {
|
||||
|
@ -1991,7 +1992,7 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_META: {
|
||||
case TDMT_VND_TABLE_NAME: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
|
@ -4005,8 +4006,8 @@ static int32_t ctgLaunchGetTbNamesTask(SCtgTask* pTask) {
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
ctgDebug("start to check tbuid metas in db %s, tbNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbUidsFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
ctgDebug("start to check tbname metas in db %s, tbNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbNamesFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
baseResIdx += taosArrayGetSize(pReq->pTables);
|
||||
}
|
||||
|
||||
|
|
|
@ -3691,8 +3691,8 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbUidsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbNamesCtx *ctx, int32_t dbIdx,
|
||||
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
|
||||
int32_t ctgGetTbNamesFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbNamesCtx *ctx, int32_t dbIdx,
|
||||
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
|
||||
int32_t tbNum = taosArrayGetSize(pList);
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
int32_t flag = CTG_FLAG_UNKNOWN_STB;
|
||||
|
|
|
@ -297,6 +297,27 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
|
|||
qDebug("Got table meta from vnode, tbFName:%s", target);
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_NAME: {
|
||||
if (TSDB_CODE_SUCCESS != rspCode) {
|
||||
if (CTG_TABLE_NOT_EXIST(rspCode)) {
|
||||
SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
|
||||
qDebug("tablemeta not exist in vnode, tbFName:%s", target);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
|
||||
CTG_ERR_RET(rspCode);
|
||||
}
|
||||
|
||||
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
|
||||
if (code) {
|
||||
qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
qDebug("Got table meta from vnode, tbFName:%s", target);
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_CFG: {
|
||||
if (TSDB_CODE_SUCCESS != rspCode) {
|
||||
qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
|
||||
|
@ -600,7 +621,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
|
|||
if (TDMT_VND_TABLE_CFG == msgType) {
|
||||
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
pName = ctx->pName;
|
||||
} else if (TDMT_VND_TABLE_META == msgType) {
|
||||
} else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) {
|
||||
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
|
||||
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
|
||||
|
@ -648,7 +669,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
|
|||
(void)tNameGetFullDbName(pName, newBatch.dbFName);
|
||||
}
|
||||
|
||||
newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
|
||||
newBatch.msgType = (vgId > 1) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
|
||||
newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);
|
||||
|
||||
if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
|
||||
|
@ -1341,7 +1362,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||
int32_t reqType = TDMT_VND_TABLE_META;
|
||||
int32_t reqType = (pTask && pTask->type == CTG_TASK_GET_TB_NAME ? TDMT_VND_TABLE_NAME : TDMT_VND_TABLE_META);
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
|
@ -1351,7 +1372,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId,
|
||||
.option = pTask && pTask->type == CTG_TASK_GET_TB_NAME ? 0x01 : 0x00,
|
||||
.option = reqType == TDMT_VND_TABLE_NAME ? 0x01 : 0x00,
|
||||
.dbFName = dbFName,
|
||||
.tbName = (char*)tNameGetTableName(pTableName)};
|
||||
char* msg = NULL;
|
||||
|
|
|
@ -586,7 +586,8 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_META:
|
||||
case TDMT_MND_TABLE_META: {
|
||||
case TDMT_MND_TABLE_META:
|
||||
case TDMT_VND_TABLE_NAME: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pCtx->out;
|
||||
taosMemoryFree(pOut->tbMeta);
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
|
@ -1701,7 +1702,7 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
|
|||
}
|
||||
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo* pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) {
|
||||
if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) {
|
||||
if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META || msgType == TDMT_VND_TABLE_NAME) {
|
||||
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
|
||||
pMsgSendInfo->target.vgId = vgId;
|
||||
pMsgSendInfo->target.dbFName = taosStrdup(dbFName);
|
||||
|
|
|
@ -14462,20 +14462,18 @@ int32_t serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap, SArray** pOut)
|
|||
static int32_t rewriteDropTablewithOpt(STranslateContext* pCxt, SDropTableStmt* pStmt) {
|
||||
if (!pStmt->withOpt) return TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pNode = NULL;
|
||||
SNode* pNode = NULL;
|
||||
char pTableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
SDropTableClause* pClause = (SDropTableClause*)pNode;
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SName name = {0};
|
||||
SName name = {0};
|
||||
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
|
||||
int32_t code = getTargetMeta(pCxt, &name, &pTableMeta, false);
|
||||
int32_t code = getTargetName(pCxt, &name, pTableName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Table uid does not exist: '%s'",
|
||||
pClause->tableName);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Table uid does not exist: '%s'", pClause->tableName);
|
||||
}
|
||||
// tstrncpy(pClause->tableName, pTableMeta->, TSDB_TABLE_NAME_LEN); // rewrite table uid to table name
|
||||
taosMemoryFree(pTableMeta);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -610,6 +610,62 @@ PROCESS_META_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t queryProcessTableNameRsp(void *output, char *msg, int32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
STableMetaRsp metaRsp = {0};
|
||||
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto PROCESS_NAME_OVER;
|
||||
}
|
||||
|
||||
if (tDeserializeSTableMetaRsp(msg, msgSize, &metaRsp) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto PROCESS_NAME_OVER;
|
||||
}
|
||||
|
||||
code = queryConvertTableMetaMsg(&metaRsp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto PROCESS_NAME_OVER;
|
||||
}
|
||||
|
||||
if (!IS_SYS_DBNAME(metaRsp.dbFName) &&
|
||||
!tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
|
||||
code = TSDB_CODE_TSC_INVALID_VALUE;
|
||||
goto PROCESS_NAME_OVER;
|
||||
}
|
||||
|
||||
STableMetaOutput *pOut = output;
|
||||
strcpy(pOut->dbFName, metaRsp.dbFName);
|
||||
pOut->dbId = metaRsp.dbId;
|
||||
|
||||
if (metaRsp.tableType == TSDB_CHILD_TABLE) {
|
||||
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
|
||||
|
||||
strcpy(pOut->ctbName, metaRsp.tbName);
|
||||
strcpy(pOut->tbName, metaRsp.stbName);
|
||||
|
||||
pOut->ctbMeta.vgId = metaRsp.vgId;
|
||||
pOut->ctbMeta.tableType = metaRsp.tableType;
|
||||
pOut->ctbMeta.uid = metaRsp.tuid;
|
||||
pOut->ctbMeta.suid = metaRsp.suid;
|
||||
|
||||
code = queryCreateTableMetaFromMsg(&metaRsp, true, &pOut->tbMeta);
|
||||
} else {
|
||||
SET_META_TYPE_TABLE(pOut->metaType);
|
||||
strcpy(pOut->tbName, metaRsp.tbName);
|
||||
code = queryCreateTableMetaFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
}
|
||||
|
||||
PROCESS_NAME_OVER:
|
||||
if (code != 0) {
|
||||
qError("failed to process table name rsp since %s", tstrerror(code));
|
||||
}
|
||||
|
||||
tFreeSTableMetaRsp(&metaRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SQnodeListRsp out = {0};
|
||||
int32_t code = 0;
|
||||
|
@ -828,6 +884,7 @@ int32_t queryProcessStreamProgressRsp(void* output, char* msg, int32_t msgSize)
|
|||
|
||||
void initQueryModuleMsgHandle() {
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_NAME)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||
|
@ -846,6 +903,7 @@ void initQueryModuleMsgHandle() {
|
|||
queryBuildMsg[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryBuildGetStreamProgressMsg;
|
||||
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_NAME)] = queryProcessTableNameRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||
|
|
Loading…
Reference in New Issue