enh: support drop table with uid
This commit is contained in:
parent
28edf40c2a
commit
ba10e95723
|
@ -123,11 +123,7 @@ typedef struct STableMeta {
|
|||
// the schema content.
|
||||
SSchema schema[];
|
||||
} STableMeta;
|
||||
typedef struct STableMetaEx {
|
||||
STableMeta* pMeta;
|
||||
// END: KEEP THIS PART SAME WITH STableMeta
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} STableMetaEx;
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef struct SViewMeta {
|
||||
|
@ -335,6 +331,7 @@ int32_t getAsofJoinReverseOp(EOperatorType op);
|
|||
|
||||
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta);
|
||||
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
|
||||
int32_t queryCreateTableMetaExFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
|
||||
char* jobTaskStatusStr(int32_t status);
|
||||
|
||||
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
|
||||
|
|
|
@ -1997,7 +1997,6 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
(void)ctgRemoveTbMetaFromCache(pCtg, pName, false); // cache update not fatal error
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
|
@ -2005,39 +2004,9 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
break;
|
||||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
|
||||
ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
|
||||
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
|
||||
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
int32_t exist = 0;
|
||||
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
|
||||
SName stbName = *pName;
|
||||
TAOS_STRCPY(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
STableMeta* stbMeta = NULL;
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta));
|
||||
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
||||
ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
exist = 1;
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
} else {
|
||||
ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == exist) {
|
||||
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
|
||||
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
|
||||
}
|
||||
}
|
||||
// if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
// TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
|
||||
// }
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -2046,33 +2015,10 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
(void)ctgUpdateTbMetaToCache(pCtg, pOut, false); // cache update not fatal error
|
||||
|
||||
if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
TAOS_MEMCPY(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
|
||||
/*
|
||||
else if (CTG_IS_META_CTABLE(pOut->metaType)) {
|
||||
SName stbName = *pName;
|
||||
TAOS_STRCPY(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
|
||||
if (NULL == pOut->tbMeta) {
|
||||
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
|
||||
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
TAOS_MEMCPY(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
*/
|
||||
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
if (NULL == pRes) {
|
||||
ctgTaskError("fail to get the %dth res in pResList, resNum:%d", pFetch->resIdx,
|
||||
|
@ -2080,17 +2026,9 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
|
|||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
STableMetaEx* pMetaEx = taosMemoryMalloc(sizeof(STableMetaEx));
|
||||
if (NULL == pMetaEx) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pMetaEx->pMeta = pOut->tbMeta;
|
||||
pOut->tbMeta = NULL;
|
||||
tstrncpy(pMetaEx->tbName, pOut->tbName, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
pRes->code = 0;
|
||||
pRes->pRes = pMetaEx;
|
||||
pRes->pRes = pOut->tbMeta;
|
||||
pOut->tbMeta = NULL;
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
|
|
|
@ -112,7 +112,7 @@ typedef struct SParseMetaCache {
|
|||
SHashObj* pViews; // key is viewFName, element is SViewMeta*
|
||||
SHashObj* pTableTSMAs; // key is tbFName, elements are SArray<STableTSMAInfo*>
|
||||
SHashObj* pTSMAs; // key is tsmaFName, elements are STableTSMAInfo*
|
||||
SHashObj* pTableName; // key is tbFUid, elements is STableMetaEx*
|
||||
SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName)
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
bool qnodeRequired;
|
||||
|
|
|
@ -533,9 +533,9 @@ static int32_t rewriteDropMetaCache(STranslateContext* pCxt) {
|
|||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
char* pKey = taosHashGetKey(ppMetaRes, NULL);
|
||||
STableMetaEx* pMetaEx = (STableMetaEx*)(*ppMetaRes)->pRes;
|
||||
if (!pMetaEx || !pMetaEx->pMeta) {
|
||||
char* pKey = taosHashGetKey(ppMetaRes, NULL);
|
||||
STableMeta* pMeta = (STableMeta*)(*ppMetaRes)->pRes;
|
||||
if (!pMeta) {
|
||||
taosHashCancelIterate(pMetaCache->pTableName, ppMetaRes);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
@ -546,8 +546,15 @@ static int32_t rewriteDropMetaCache(STranslateContext* pCxt) {
|
|||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
tstrncpy(dbName, pDbStart + 1, pDbEnd - pDbStart);
|
||||
|
||||
int32_t metaSize =
|
||||
sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags);
|
||||
int32_t schemaExtSize =
|
||||
(useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
|
||||
const char* pTbName = (const char*)pMeta + metaSize + schemaExtSize;
|
||||
|
||||
SName name = {0};
|
||||
toName(pParCxt->acctId, dbName, pMetaEx->tbName, &name);
|
||||
toName(pParCxt->acctId, dbName, pTbName, &name);
|
||||
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
code = tNameExtractFullName(&name, fullName);
|
||||
|
@ -561,7 +568,7 @@ static int32_t rewriteDropMetaCache(STranslateContext* pCxt) {
|
|||
return code;
|
||||
}
|
||||
|
||||
SMetaRes **qqMetaRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
|
||||
SMetaRes** qqMetaRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
|
||||
if (!qqMetaRes) {
|
||||
taosHashCancelIterate(pMetaCache->pTableName, ppMetaRes);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -1053,12 +1053,17 @@ int32_t getTableNameFromCache(SParseMetaCache* pMetaCache, const SName* pName, c
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
const char* pTableName = NULL;
|
||||
code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableName,
|
||||
(void**)&pTableName);
|
||||
const STableMeta* pMeta = NULL;
|
||||
code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableName, (void**)&pMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t metaSize =
|
||||
sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags);
|
||||
int32_t schemaExtSize =
|
||||
(useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
|
||||
const char* pTableName = (const char*)pMeta + metaSize + schemaExtSize;
|
||||
tstrncpy(pTbName, pTableName, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -554,6 +554,64 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
|
||||
int32_t total = msg->numOfColumns + msg->numOfTags;
|
||||
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
|
||||
int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
|
||||
int32_t tbNameSize = strlen(msg->tbName) + 1;
|
||||
|
||||
STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + tbNameSize);
|
||||
if (NULL == pTableMeta) {
|
||||
qError("calloc size[%d] failed", metaSize);
|
||||
return terrno;
|
||||
}
|
||||
SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
|
||||
|
||||
pTableMeta->vgId = isStb ? 0 : msg->vgId;
|
||||
pTableMeta->tableType = isStb ? TSDB_SUPER_TABLE : msg->tableType;
|
||||
pTableMeta->uid = isStb ? msg->suid : msg->tuid;
|
||||
pTableMeta->suid = msg->suid;
|
||||
pTableMeta->sversion = msg->sversion;
|
||||
pTableMeta->tversion = msg->tversion;
|
||||
|
||||
pTableMeta->tableInfo.numOfTags = msg->numOfTags;
|
||||
pTableMeta->tableInfo.precision = msg->precision;
|
||||
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
|
||||
|
||||
memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
|
||||
if (useCompress(msg->tableType) && msg->pSchemaExt) {
|
||||
pTableMeta->schemaExt = pSchemaExt;
|
||||
memcpy(pSchemaExt, msg->pSchemaExt, schemaExtSize);
|
||||
} else {
|
||||
pTableMeta->schemaExt = NULL;
|
||||
}
|
||||
|
||||
bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY);
|
||||
for (int32_t i = 0; i < msg->numOfColumns; ++i) {
|
||||
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
|
||||
if (hasPK && (i > 0)) {
|
||||
if ((pTableMeta->schema[i].flags & COL_IS_KEY)) {
|
||||
++pTableMeta->tableInfo.numOfPKs;
|
||||
} else {
|
||||
hasPK = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char *pTbName = (char *)pTableMeta + metaSize + schemaExtSize;
|
||||
tstrncpy(pTbName, msg->tbName, tbNameSize);
|
||||
|
||||
qDebug("table %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s stb %s suid %" PRIx64
|
||||
" sver %d tver %d"
|
||||
" tagNum %d colNum %d precision %d rowSize %d",
|
||||
msg->tbName, pTableMeta->uid, pTableMeta->tableType, pTableMeta->vgId, msg->dbFName, msg->stbName,
|
||||
pTableMeta->suid, pTableMeta->sversion, pTableMeta->tversion, pTableMeta->tableInfo.numOfTags,
|
||||
pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.precision, pTableMeta->tableInfo.rowSize);
|
||||
|
||||
*pMeta = pTableMeta;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
STableMetaRsp metaRsp = {0};
|
||||
|
@ -650,11 +708,11 @@ static int32_t queryProcessTableNameRsp(void *output, char *msg, int32_t msgSize
|
|||
pOut->ctbMeta.uid = metaRsp.tuid;
|
||||
pOut->ctbMeta.suid = metaRsp.suid;
|
||||
|
||||
code = queryCreateTableMetaFromMsg(&metaRsp, true, &pOut->tbMeta);
|
||||
code = queryCreateTableMetaExFromMsg(&metaRsp, true, &pOut->tbMeta);
|
||||
} else {
|
||||
SET_META_TYPE_TABLE(pOut->metaType);
|
||||
strcpy(pOut->tbName, metaRsp.tbName);
|
||||
code = queryCreateTableMetaFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
code = queryCreateTableMetaExFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
}
|
||||
|
||||
PROCESS_NAME_OVER:
|
||||
|
|
Loading…
Reference in New Issue