feat: support describe view command

This commit is contained in:
dapan1121 2023-10-11 18:49:47 +08:00
parent 478de7ae56
commit f5768494b4
11 changed files with 123 additions and 28 deletions

View File

@ -3939,7 +3939,11 @@ typedef struct {
uint64_t dbId; uint64_t dbId;
uint64_t viewId; uint64_t viewId;
char* querySql; char* querySql;
int8_t precision;
int8_t type;
int32_t version; int32_t version;
int32_t numOfCols;
SSchema* pSchema;
} SViewMetaRsp; } SViewMetaRsp;
int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp); int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp);
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp); int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp);

View File

@ -120,9 +120,13 @@ typedef struct STableMeta {
#pragma pack(pop) #pragma pack(pop)
typedef struct SViewMeta { typedef struct SViewMeta {
int32_t version;
uint64_t viewId; uint64_t viewId;
char* querySql; char* querySql;
int8_t precision;
int8_t type;
int32_t version;
int32_t numOfCols;
SSchema* pSchema;
} SViewMeta; } SViewMeta;
typedef struct SDBVgInfo { typedef struct SDBVgInfo {

View File

@ -8631,7 +8631,14 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR
if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1; if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1; if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->type) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1; if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfCols) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
if (tEncodeSSchema(&encoder, pSchema) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
@ -8650,7 +8657,22 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp)
if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1; if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->type) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfCols) < 0) return -1;
if (pRsp->numOfCols > 0) {
pRsp->pSchema = taosMemoryCalloc(pRsp->numOfCols, sizeof(SSchema));
if (pRsp->pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema* pSchema = pRsp->pSchema + i;
if (tDecodeSSchema(&decoder, pSchema) < 0) return -1;
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
@ -8664,6 +8686,7 @@ void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) {
} }
taosMemoryFree(pRsp->querySql); taosMemoryFree(pRsp->querySql);
taosMemoryFree(pRsp->pSchema);
} }

View File

@ -993,6 +993,7 @@ int32_t ctgLaunchSubTask(SCtgTask** ppTask, CTG_TASK_TYPE type, ctgSubTaskCbFp f
int32_t ctgGetTbCfgCb(SCtgTask* pTask); int32_t ctgGetTbCfgCb(SCtgTask* pTask);
void ctgFreeHandle(SCatalog* pCatalog); void ctgFreeHandle(SCatalog* pCatalog);
void ctgFreeSViewMeta(SViewMeta* pMeta);
void ctgFreeMsgSendParam(void* param); void ctgFreeMsgSendParam(void* param);
void ctgFreeBatch(SCtgBatch* pBatch); void ctgFreeBatch(SCtgBatch* pBatch);
void ctgFreeBatchs(SHashObj* pBatchs); void ctgFreeBatchs(SHashObj* pBatchs);
@ -1048,6 +1049,7 @@ void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat); void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res); int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx); int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx);
int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta);
void ctgGetGlobalCacheSize(uint64_t *pSize); void ctgGetGlobalCacheSize(uint64_t *pSize);
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex); uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex);
uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta); uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta);

View File

@ -1860,14 +1860,13 @@ int32_t ctgHandleGetViewsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
if (NULL == pViewMeta) { if (NULL == pViewMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
pViewMeta->querySql = strdup(pRsp->querySql); dupViewMetaFromRsp(pRsp, pViewMeta);
if (NULL == pViewMeta->querySql) { if (TSDB_CODE_SUCCESS != (code = dupViewMetaFromRsp(pRsp, pViewMeta))) {
ctgFreeSViewMeta(pViewMeta);
taosMemoryFree(pViewMeta); taosMemoryFree(pViewMeta);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(code);
} }
pViewMeta->version = pRsp->version;
pViewMeta->viewId = pRsp->viewId;
ctgUpdateViewMetaToCache(pCtg, pRsp, false); ctgUpdateViewMetaToCache(pCtg, pRsp, false);

View File

@ -2325,33 +2325,31 @@ int32_t ctgOpUpdateViewMeta(SCtgCacheOperation *operation) {
SViewMetaRsp *pRsp = msg->pRsp; SViewMetaRsp *pRsp = msg->pRsp;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
taosMemoryFreeClear(msg);
if (pCtg->stopUpdate) { if (pCtg->stopUpdate) {
goto _return; return TSDB_CODE_SUCCESS;
} }
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache)); CTG_ERR_RET(ctgGetAddDBCache(pCtg, pRsp->dbFName, pRsp->dbId, &dbCache));
if (NULL == dbCache) { if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId); ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pRsp->dbFName, pRsp->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
SViewMeta *pMeta = taosMemoryCalloc(1, sizeof(SViewMeta)); SViewMeta *pMeta = taosMemoryCalloc(1, sizeof(SViewMeta));
if (NULL == pMeta) { if (NULL == pMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
pMeta->querySql = strdup(pRsp->querySql);
if (NULL == pMeta->querySql) {
taosMemoryFree(pMeta);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pMeta->viewId = pRsp->viewId;
pMeta->version = pRsp->version;
CTG_ERR_JRET(ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta)); CTG_ERR_JRET(dupViewMetaFromRsp(pRsp, pMeta));
CTG_RET(ctgWriteViewMetaToCache(pCtg, dbCache, pRsp->dbFName, pRsp->name, pMeta));
_return: _return:
taosMemoryFreeClear(msg); ctgFreeSViewMeta(pMeta);
taosMemoryFree(pMeta);
CTG_RET(code); CTG_RET(code);
} }
@ -3057,14 +3055,22 @@ int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsC
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(pViewMeta, pCache->pMeta, sizeof(*pViewMeta));
pViewMeta->querySql = strdup(pCache->pMeta->querySql); pViewMeta->querySql = strdup(pCache->pMeta->querySql);
if (NULL == pViewMeta->querySql) { if (NULL == pViewMeta->querySql) {
ctgReleaseViewMetaToCache(pCtg, dbCache, pCache); ctgReleaseViewMetaToCache(pCtg, dbCache, pCache);
pViewMeta->pSchema = NULL;
taosMemoryFree(pViewMeta); taosMemoryFree(pViewMeta);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
pViewMeta->version = pCache->pMeta->version; pViewMeta->pSchema = taosMemoryMalloc(pViewMeta->numOfCols * sizeof(SSchema));
pViewMeta->viewId = pCache->pMeta->viewId; if (pViewMeta->pSchema == NULL) {
ctgReleaseViewMetaToCache(pCtg, dbCache, pCache);
ctgFreeSViewMeta(pViewMeta);
taosMemoryFree(pViewMeta);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pViewMeta->pSchema, pCache->pMeta->pSchema, pViewMeta->numOfCols * sizeof(SSchema));
CTG_UNLOCK(CTG_READ, &pCache->viewLock); CTG_UNLOCK(CTG_READ, &pCache->viewLock);
taosHashRelease(dbCache->viewCache, pCache); taosHashRelease(dbCache->viewCache, pCache);

View File

@ -19,6 +19,15 @@
#include "tname.h" #include "tname.h"
#include "trpc.h" #include "trpc.h"
void ctgFreeSViewMeta(SViewMeta* pMeta) {
if (NULL == pMeta) {
return;
}
taosMemoryFree(pMeta->querySql);
taosMemoryFree(pMeta->pSchema);
}
void ctgFreeMsgSendParam(void* param) { void ctgFreeMsgSendParam(void* param) {
if (NULL == param) { if (NULL == param) {
return; return;
@ -554,7 +563,7 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
if (NULL != pCtx->out) { if (NULL != pCtx->out) {
SViewMetaRsp* pOut = *(SViewMetaRsp**)pCtx->out; SViewMetaRsp* pOut = *(SViewMetaRsp**)pCtx->out;
if (NULL != pOut) { if (NULL != pOut) {
taosMemoryFree(pOut->querySql); tFreeSViewMetaRsp(pOut);
} }
taosMemoryFreeClear(pCtx->out); taosMemoryFreeClear(pCtx->out);
} }
@ -626,7 +635,7 @@ void ctgFreeViewMetaRes(void* res) {
SMetaRes* pRes = (SMetaRes*)res; SMetaRes* pRes = (SMetaRes*)res;
if (NULL != pRes->pRes) { if (NULL != pRes->pRes) {
SViewMeta* pMeta = (SViewMeta*)pRes->pRes; SViewMeta* pMeta = (SViewMeta*)pRes->pRes;
taosMemoryFreeClear(pMeta->querySql); ctgFreeSViewMeta(pMeta);
taosMemoryFreeClear(pRes->pRes); taosMemoryFreeClear(pRes->pRes);
} }
} }
@ -1867,7 +1876,7 @@ uint64_t ctgGetViewMetaCacheSize(SViewMeta *pMeta) {
return 0; return 0;
} }
return sizeof(*pMeta) + strlen(pMeta->querySql) + 1; return sizeof(*pMeta) + strlen(pMeta->querySql) + 1 + pMeta->numOfCols * sizeof(SSchema);
} }
@ -2089,4 +2098,24 @@ int32_t ctgBuildViewNullRes(SCtgTask* pTask, SCtgViewsCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) {
pViewMeta->querySql = strdup(pRsp->querySql);
if (NULL == pViewMeta->querySql) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pViewMeta->version = pRsp->version;
pViewMeta->viewId = pRsp->viewId;
pViewMeta->precision = pRsp->precision;
pViewMeta->type = pRsp->type;
pViewMeta->numOfCols = pRsp->numOfCols;
pViewMeta->pSchema = taosMemoryMalloc(pViewMeta->numOfCols * sizeof(SSchema));
if (pViewMeta->pSchema == NULL) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pViewMeta->pSchema, pRsp->pSchema, pViewMeta->numOfCols * sizeof(SSchema));
return TSDB_CODE_SUCCESS;
}

View File

@ -112,7 +112,11 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
colDataSetVal(pCol2, pBlock->info.rows, buf, false); colDataSetVal(pCol2, pBlock->info.rows, buf, false);
int32_t bytes = getSchemaBytes(pMeta->schema + i); int32_t bytes = getSchemaBytes(pMeta->schema + i);
colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false); colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false);
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : ""); if (TSDB_VIEW_TABLE != pMeta->tableType) {
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
} else {
STR_TO_VARSTR(buf, "VIEW");
}
colDataSetVal(pCol4, pBlock->info.rows, buf, false); colDataSetVal(pCol4, pBlock->info.rows, buf, false);
++(pBlock->info.rows); ++(pBlock->info.rows);
} }

View File

@ -381,6 +381,11 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
strcpy(name.dbname, pStmt->dbName); strcpy(name.dbname, pStmt->dbName);
strcpy(name.tname, pStmt->tableName); strcpy(name.tname, pStmt->tableName);
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
#ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS == code) {
code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, &name);
}
#endif
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
} }

View File

@ -349,7 +349,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
if (pParCxt->async) { if (pParCxt->async) {
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta); code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
int32_t origCode = code; int32_t origCode = code;
code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta); code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -417,6 +417,15 @@ static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName,
code = catalogRefreshGetTableMeta(pParCxt->pCatalog, &conn, &name, pMeta, false); code = catalogRefreshGetTableMeta(pParCxt->pCatalog, &conn, &name, pMeta, false);
} }
#ifdef TD_ENTERPRISE
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
int32_t origCode = code;
code = getViewMetaFromCache(pCxt->pMetaCache, &name, pMeta);
if (TSDB_CODE_SUCCESS != code) {
code = origCode;
}
}
#endif
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("0x%" PRIx64 " catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", parserError("0x%" PRIx64 " catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s",
pCxt->pParseCxt->requestId, tstrerror(code), pDbName, pTableName); pCxt->pParseCxt->requestId, tstrerror(code), pDbName, pTableName);

View File

@ -910,12 +910,22 @@ int32_t getViewMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, ST
SViewMeta* pViewMeta = NULL; SViewMeta* pViewMeta = NULL;
int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pViews, (void**)&pViewMeta); int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pViews, (void**)&pViewMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pMeta = taosMemoryCalloc(1, sizeof(STableMeta)); *pMeta = taosMemoryCalloc(1, sizeof(STableMeta) + pViewMeta->numOfCols * sizeof(SSchema));
if (NULL == *pMeta) { if (NULL == *pMeta) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
(*pMeta)->uid = pViewMeta->viewId; (*pMeta)->uid = pViewMeta->viewId;
(*pMeta)->vgId = MNODE_HANDLE;
(*pMeta)->tableType = TSDB_VIEW_TABLE; (*pMeta)->tableType = TSDB_VIEW_TABLE;
(*pMeta)->sversion = pViewMeta->version;
(*pMeta)->tversion = pViewMeta->version;
(*pMeta)->tableInfo.precision = pViewMeta->precision;
(*pMeta)->tableInfo.numOfColumns = pViewMeta->numOfCols;
memcpy((*pMeta)->schema, pViewMeta->pSchema, sizeof(SSchema) * pViewMeta->numOfCols);
for (int32_t i = 0; i < pViewMeta->numOfCols; ++i) {
(*pMeta)->tableInfo.rowSize += (*pMeta)->schema[i].bytes;
}
} }
return code; return code;
} }