feat: support query from view
This commit is contained in:
parent
deb9a6875b
commit
5b639a172d
|
@ -42,7 +42,8 @@ typedef enum {
|
|||
TSDB_TEMP_TABLE = 4, // temp table created by nest query
|
||||
TSDB_SYSTEM_TABLE = 5,
|
||||
TSDB_TSMA_TABLE = 6, // time-range-wise sma
|
||||
TSDB_TABLE_MAX = 7
|
||||
TSDB_VIEW_TABLE = 7,
|
||||
TSDB_TABLE_MAX = 8
|
||||
} ETableType;
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -99,6 +99,7 @@ typedef struct SMetaRes {
|
|||
} SMetaRes;
|
||||
|
||||
typedef struct SMetaData {
|
||||
bool ctgFree; // need to freed by catalog module
|
||||
SArray* pDbVgroup; // pRes = SArray<SVgroupInfo>*
|
||||
SArray* pDbCfg; // pRes = SDbCfgInfo*
|
||||
SArray* pDbInfo; // pRes = SDbInfo*
|
||||
|
|
|
@ -22,9 +22,7 @@ extern "C" {
|
|||
|
||||
#include "query.h"
|
||||
#include "querynodes.h"
|
||||
|
||||
struct SCatalogReq;
|
||||
struct SMetaData;
|
||||
#include "catalog.h"
|
||||
|
||||
typedef struct SStmtCallback {
|
||||
TAOS_STMT* pStmt;
|
||||
|
@ -33,7 +31,32 @@ typedef struct SStmtCallback {
|
|||
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
|
||||
} SStmtCallback;
|
||||
|
||||
typedef int32_t (*validateSqlFn)(void* param, const char* sql, SCMCreateViewReq* pRes);
|
||||
typedef enum {
|
||||
PARSE_SQL_RES_QUERY = 1,
|
||||
PARSE_SQL_RES_SCHEMA,
|
||||
} SParseResType;
|
||||
|
||||
typedef struct SParseSchemaRes {
|
||||
int8_t precision;
|
||||
int32_t numOfCols;
|
||||
SSchema* pSchema;
|
||||
} SParseSchemaRes;
|
||||
|
||||
typedef struct SParseQueryRes {
|
||||
SNode* pQuery;
|
||||
SCatalogReq* pCatalogReq;
|
||||
SMetaData meta;
|
||||
} SParseQueryRes;
|
||||
|
||||
typedef struct SParseSqlRes {
|
||||
SParseResType resType;
|
||||
union {
|
||||
SParseSchemaRes schemaRes;
|
||||
SParseQueryRes queryRes;
|
||||
};
|
||||
} SParseSqlRes;
|
||||
|
||||
typedef int32_t (*parseSqlFn)(void*, const char*, bool, SParseSqlRes*);
|
||||
|
||||
typedef struct SParseCsvCxt {
|
||||
TdFilePtr fp; // last parsed file
|
||||
|
@ -57,6 +80,7 @@ typedef struct SParseContext {
|
|||
struct SCatalog* pCatalog;
|
||||
SStmtCallback* pStmtCb;
|
||||
const char* pUser;
|
||||
bool parseOnly;
|
||||
bool isSuperUser;
|
||||
bool enableSysInfo;
|
||||
bool async;
|
||||
|
@ -66,8 +90,8 @@ typedef struct SParseContext {
|
|||
SArray* pTableMetaPos; // sql table pos => catalog data pos
|
||||
SArray* pTableVgroupPos; // sql table pos => catalog data pos
|
||||
int64_t allocatorId;
|
||||
validateSqlFn validateSqlFp;
|
||||
void* validateSqlParam;
|
||||
parseSqlFn parseSqlFp;
|
||||
void* parseSqlParam;
|
||||
} SParseContext;
|
||||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
|
|
@ -264,6 +264,7 @@ typedef struct SRequestObj {
|
|||
bool syncQuery; // todo refactor: async query object
|
||||
bool stableQuery; // todo refactor
|
||||
bool validateOnly; // todo refactor
|
||||
bool parseOnly;
|
||||
bool killed;
|
||||
bool inRetry;
|
||||
bool isSubReq;
|
||||
|
@ -274,6 +275,7 @@ typedef struct SRequestObj {
|
|||
void* pPostPlan;
|
||||
SReqRelInfo relation;
|
||||
void* pWrapper;
|
||||
SMetaData parseMeta;
|
||||
} SRequestObj;
|
||||
|
||||
typedef struct SSyncQueryParam {
|
||||
|
@ -300,7 +302,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
|
|||
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
||||
int64_t reqid);
|
||||
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
|
||||
int32_t clientValidateSql(void* param, const char* sql, SCMCreateViewReq* pReq);
|
||||
int32_t clientParseSql(void* param, const char* sql, bool parseOnly, SParseSqlRes* pRes);
|
||||
void syncQueryFn(void* param, void* res, int32_t code);
|
||||
|
||||
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
||||
|
@ -415,7 +417,7 @@ void stopAllQueries(SRequestObj *pRequest);
|
|||
void freeQueryParam(SSyncQueryParam* param);
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
int32_t clientValidateSqlImpl(void* param, const char* sql, SCMCreateViewReq* pReq);
|
||||
int32_t clientParseSqlImpl(void* param, const char* sql, bool parseOnly, SParseSqlRes* pRes);
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -1111,31 +1111,34 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
|
||||
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
|
||||
SSqlCallbackWrapper* pWrapper) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pRequest->type = pQuery->msgType;
|
||||
|
||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user,
|
||||
.sysInfo = pRequest->pTscObj->sysInfo,
|
||||
.allocatorId = pRequest->allocatorRefId};
|
||||
|
||||
SQueryPlan* pDag = NULL;
|
||||
|
||||
SArray* pMnodeList = NULL;
|
||||
SQueryPlan* pDag = NULL;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
|
||||
if (code) {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
|
||||
|
||||
if (!pRequest->parseOnly) {
|
||||
pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user,
|
||||
.sysInfo = pRequest->pTscObj->sysInfo,
|
||||
.allocatorId = pRequest->allocatorRefId};
|
||||
|
||||
code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
|
||||
if (code) {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
|
||||
}
|
||||
}
|
||||
|
||||
pRequest->metric.execStart = taosGetTimestampUs();
|
||||
|
@ -1188,6 +1191,11 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pRequest->parseOnly) {
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
pRequest->body.execMode = pQuery->execMode;
|
||||
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
|
@ -1221,6 +1229,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
|||
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||
break;
|
||||
default:
|
||||
tscError("0x%" PRIx64 " invalid execMode %d", pRequest->self, pQuery->execMode);
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, -1);
|
||||
break;
|
||||
}
|
||||
|
@ -2592,11 +2601,11 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
|
|||
schedulerFetchRows(pRequest->body.queryJob, &req);
|
||||
}
|
||||
|
||||
int32_t clientValidateSql(void* param, const char* sql, SCMCreateViewReq* pReq) {
|
||||
int32_t clientParseSql(void* param, const char* sql, bool parseOnly, SParseSqlRes* pRes) {
|
||||
#ifndef TD_ENTERPRISE
|
||||
return TSDB_CODE_SUCCESS;
|
||||
#else
|
||||
return clientValidateSqlImpl(param, sql, pReq);
|
||||
return clientParseSqlImpl(param, sql, parseOnly, pRes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
|
@ -926,6 +926,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
|
||||
int64_t analyseStart = taosGetTimestampUs();
|
||||
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
|
||||
pWrapper->pParseCtx->parseOnly = pRequest->parseOnly;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
|
@ -933,6 +934,11 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
|
||||
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
|
||||
|
||||
if (pRequest->parseOnly) {
|
||||
memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
|
||||
memset(pResultMeta, 0, sizeof(*pResultMeta));
|
||||
}
|
||||
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
}
|
||||
|
||||
|
@ -1166,8 +1172,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SS
|
|||
.svrVer = pTscObj->sVer,
|
||||
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
|
||||
.allocatorId = pRequest->allocatorRefId,
|
||||
.validateSqlFp = clientValidateSql,
|
||||
.validateSqlParam = pWrapper};
|
||||
.parseSqlFp = clientParseSql,
|
||||
.parseSqlParam = pWrapper};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
int32_t mndInitView(SMnode *pMnode) {
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_VIEW, mndProcessCreateViewReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_VIEW, mndProcessDropViewReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessGetViewMetaReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_VIEW_META, mndProcessGetViewMetaReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndRetrieveView);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VIEWS, mndCancelGetNextView);
|
||||
|
|
|
@ -1657,6 +1657,10 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
|
|||
CTG_API_LEAVE(ctgUpdateUserEnqueue(pCtg, pAuth, false));
|
||||
}
|
||||
|
||||
void catalogFreeMetaData(SMetaData * pData) {
|
||||
|
||||
}
|
||||
|
||||
int32_t catalogClearCache(void) {
|
||||
CTG_API_ENTER_NOLOCK();
|
||||
|
||||
|
|
|
@ -606,6 +606,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
|
||||
SCtgJob* pJob = *job;
|
||||
|
||||
pJob->jobRes.ctgFree = true;
|
||||
pJob->subTaskNum = taskNum;
|
||||
pJob->queryId = pConn->requestId;
|
||||
pJob->userFp = fp;
|
||||
|
|
|
@ -522,9 +522,13 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TDMT_MND_VIEW_META: {
|
||||
SViewMetaRsp* pOut = (SViewMetaRsp*)pCtx->out;
|
||||
taosMemoryFree(pOut->querySql);
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
if (NULL != pCtx->out) {
|
||||
SViewMetaRsp* pOut = *(SViewMetaRsp**)pCtx->out;
|
||||
if (NULL != pOut) {
|
||||
taosMemoryFree(pOut->querySql);
|
||||
}
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -585,6 +589,19 @@ void ctgFreeBatchHash(void* hash) {
|
|||
taosMemoryFreeClear(pRes->pRes);
|
||||
}
|
||||
|
||||
void ctgFreeViewMetaRes(void* res) {
|
||||
if (NULL == res) {
|
||||
return;
|
||||
}
|
||||
|
||||
SMetaRes* pRes = (SMetaRes*)res;
|
||||
if (NULL != pRes->pRes) {
|
||||
SViewMeta* pMeta = (SViewMeta*)pRes->pRes;
|
||||
taosMemoryFreeClear(pMeta->querySql);
|
||||
taosMemoryFreeClear(pRes->pRes);
|
||||
}
|
||||
}
|
||||
|
||||
void ctgFreeJsonTagVal(void* val) {
|
||||
if (NULL == val) {
|
||||
return;
|
||||
|
@ -671,6 +688,15 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
|
|||
*pRes = NULL; // no need to free it
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_VIEW: {
|
||||
SArray* pArray = (SArray*)*pRes;
|
||||
int32_t num = taosArrayGetSize(pArray);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
ctgFreeViewMetaRes(taosArrayGet(pArray, i));
|
||||
}
|
||||
*pRes = NULL; // no need to free it
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("invalid task type %d", type);
|
||||
break;
|
||||
|
@ -823,22 +849,35 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
|
|||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_VIEW: {
|
||||
SCtgViewsCtx* taskCtx = (SCtgViewsCtx*)pTask->taskCtx;
|
||||
taosArrayDestroyEx(taskCtx->pResList, ctgFreeViewMetaRes);
|
||||
taosArrayDestroy(taskCtx->pFetchs);
|
||||
// NO NEED TO FREE pNames
|
||||
|
||||
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
|
||||
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("invalid task type %d", pTask->type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ctgFreeTask(SCtgTask* pTask) {
|
||||
void ctgFreeTask(SCtgTask* pTask, bool freeRes) {
|
||||
ctgFreeMsgCtx(&pTask->msgCtx);
|
||||
ctgFreeTaskRes(pTask->type, &pTask->res);
|
||||
if (freeRes) {
|
||||
ctgFreeTaskRes(pTask->type, &pTask->res);
|
||||
}
|
||||
ctgFreeTaskCtx(pTask);
|
||||
|
||||
taosArrayDestroy(pTask->pParents);
|
||||
ctgClearSubTaskRes(&pTask->subRes);
|
||||
}
|
||||
|
||||
void ctgFreeTasks(SArray* pArray) {
|
||||
void ctgFreeTasks(SArray* pArray, bool freeRes) {
|
||||
if (NULL == pArray) {
|
||||
return;
|
||||
}
|
||||
|
@ -846,7 +885,7 @@ void ctgFreeTasks(SArray* pArray) {
|
|||
int32_t num = taosArrayGetSize(pArray);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SCtgTask* pTask = taosArrayGet(pArray, i);
|
||||
ctgFreeTask(pTask);
|
||||
ctgFreeTask(pTask, freeRes);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pArray);
|
||||
|
@ -862,7 +901,7 @@ void ctgFreeJob(void* job) {
|
|||
int64_t rid = pJob->refId;
|
||||
uint64_t qid = pJob->queryId;
|
||||
|
||||
ctgFreeTasks(pJob->pTasks);
|
||||
ctgFreeTasks(pJob->pTasks, pJob->jobRes.ctgFree);
|
||||
ctgFreeBatchs(pJob->pBatchs);
|
||||
|
||||
ctgFreeSMetaData(&pJob->jobRes);
|
||||
|
@ -1763,8 +1802,9 @@ SMetaData* catalogCloneMetaData(SMetaData* pData) {
|
|||
|
||||
return pRes;
|
||||
}
|
||||
#endif
|
||||
|
||||
void catalogFreeMetaData(SMetaData* pData) {
|
||||
void ctgFreeMetaData(SMetaData* pData) {
|
||||
if (NULL == pData) {
|
||||
return;
|
||||
}
|
||||
|
@ -1784,7 +1824,6 @@ void catalogFreeMetaData(SMetaData* pData) {
|
|||
taosMemoryFreeClear(pData->pSvrVer);
|
||||
taosMemoryFree(pData);
|
||||
}
|
||||
#endif
|
||||
|
||||
uint64_t ctgGetTbIndexCacheSize(STableIndex *pIndex) {
|
||||
if (NULL == pIndex) {
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
aux_source_directory(src PARSER_SRC)
|
||||
|
||||
IF (TD_ENTERPRISE)
|
||||
LIST(APPEND PARSER_SRC ${TD_ENTERPRISE_DIR}/src/plugins/view/src/parserView.c)
|
||||
ENDIF ()
|
||||
|
||||
add_library(parser STATIC ${PARSER_SRC})
|
||||
target_include_directories(
|
||||
parser
|
||||
|
|
|
@ -27,6 +27,29 @@ extern "C" {
|
|||
#define QUERY_SMA_OPTIMIZE_DISABLE 0
|
||||
#define QUERY_SMA_OPTIMIZE_ENABLE 1
|
||||
|
||||
typedef struct STranslateContext {
|
||||
SParseContext* pParseCxt;
|
||||
int32_t errCode;
|
||||
SMsgBuf msgBuf;
|
||||
SArray* pNsLevel; // element is SArray*, the element of this subarray is STableNode*
|
||||
int32_t currLevel;
|
||||
int32_t levelNo;
|
||||
ESqlClause currClause;
|
||||
SNode* pCurrStmt;
|
||||
SCmdMsgInfo* pCmdMsg;
|
||||
SHashObj* pDbs;
|
||||
SHashObj* pTables;
|
||||
SHashObj* pTargetTables;
|
||||
SExplainOptions* pExplainOpt;
|
||||
SParseMetaCache* pMetaCache;
|
||||
bool createStream;
|
||||
bool stableQuery;
|
||||
bool showRewrite;
|
||||
SNode* pPrevRoot;
|
||||
SNode* pPostRoot;
|
||||
} STranslateContext;
|
||||
|
||||
|
||||
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData);
|
||||
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
|
||||
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
|
||||
|
@ -37,7 +60,12 @@ int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
|||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray* pPlaceholderValues);
|
||||
int32_t translateTable(STranslateContext* pCxt, SNode** pTable);
|
||||
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput);
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName);
|
||||
#endif
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -69,7 +69,7 @@ typedef struct SParseMetaCache {
|
|||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SHashObj* pViews; // key is viewFName, element is SViewInfo*
|
||||
SHashObj* pViews; // key is viewFName, element is SViewMeta*
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
} SParseMetaCache;
|
||||
|
@ -110,6 +110,7 @@ int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pT
|
|||
int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache);
|
||||
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
|
||||
int32_t getViewMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
|
||||
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
|
||||
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
|
||||
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
|
||||
|
|
|
@ -35,28 +35,6 @@ typedef struct SRewriteTbNameContext {
|
|||
char* pTbName;
|
||||
} SRewriteTbNameContext;
|
||||
|
||||
typedef struct STranslateContext {
|
||||
SParseContext* pParseCxt;
|
||||
int32_t errCode;
|
||||
SMsgBuf msgBuf;
|
||||
SArray* pNsLevel; // element is SArray*, the element of this subarray is STableNode*
|
||||
int32_t currLevel;
|
||||
int32_t levelNo;
|
||||
ESqlClause currClause;
|
||||
SNode* pCurrStmt;
|
||||
SCmdMsgInfo* pCmdMsg;
|
||||
SHashObj* pDbs;
|
||||
SHashObj* pTables;
|
||||
SHashObj* pTargetTables;
|
||||
SExplainOptions* pExplainOpt;
|
||||
SParseMetaCache* pMetaCache;
|
||||
bool createStream;
|
||||
bool stableQuery;
|
||||
bool showRewrite;
|
||||
SNode* pPrevRoot;
|
||||
SNode* pPostRoot;
|
||||
} STranslateContext;
|
||||
|
||||
typedef struct SBuildTopicContext {
|
||||
bool colExists;
|
||||
bool colNotFound;
|
||||
|
@ -354,7 +332,7 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
|
|||
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
|
||||
}
|
||||
|
||||
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
|
||||
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta, bool couldBeView) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -363,6 +341,11 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
|
||||
#ifdef TD_ENTERPRISE
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
code = getViewMetaFromCache(pCxt->pMetaCache, pName, pMeta);
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
|
||||
.requestId = pParCxt->requestId,
|
||||
|
@ -380,7 +363,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
|
|||
|
||||
static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
|
||||
SName name;
|
||||
return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta);
|
||||
return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta, false);
|
||||
}
|
||||
|
||||
static int32_t getTableCfg(STranslateContext* pCxt, const SName* pName, STableCfg** pCfg) {
|
||||
|
@ -2775,21 +2758,26 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pTable)) {
|
||||
switch (nodeType(*pTable)) {
|
||||
case QUERY_NODE_REAL_TABLE: {
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)*pTable;
|
||||
pRealTable->ratio = (NULL != pCxt->pExplainOpt ? pCxt->pExplainOpt->ratio : 1.0);
|
||||
// The SRealTableNode created through ROLLUP already has STableMeta.
|
||||
if (NULL == pRealTable->pMeta) {
|
||||
SName name;
|
||||
code = getTableMetaImpl(
|
||||
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
|
||||
&(pRealTable->pMeta));
|
||||
&(pRealTable->pMeta), true);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
|
||||
}
|
||||
#ifdef TD_ENTERPRISE
|
||||
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) {
|
||||
return translateView(pCxt, pTable, &name);
|
||||
}
|
||||
#endif
|
||||
code = setTableVgroupList(pCxt, &name, pRealTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setTableIndex(pCxt, &name, pRealTable);
|
||||
|
@ -2814,7 +2802,7 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_TEMP_TABLE: {
|
||||
STempTableNode* pTempTable = (STempTableNode*)pTable;
|
||||
STempTableNode* pTempTable = (STempTableNode*)*pTable;
|
||||
code = translateSubquery(pCxt, pTempTable->pSubquery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pTempTable->pSubquery) &&
|
||||
|
@ -2829,10 +2817,10 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_JOIN_TABLE: {
|
||||
SJoinTableNode* pJoinTable = (SJoinTableNode*)pTable;
|
||||
code = translateTable(pCxt, pJoinTable->pLeft);
|
||||
SJoinTableNode* pJoinTable = (SJoinTableNode*)*pTable;
|
||||
code = translateTable(pCxt, &pJoinTable->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateTable(pCxt, pJoinTable->pRight);
|
||||
code = translateTable(pCxt, &pJoinTable->pRight);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkJoinTable(pCxt, pJoinTable);
|
||||
|
@ -3789,7 +3777,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateFrom(STranslateContext* pCxt, SNode* pTable) {
|
||||
static int32_t translateFrom(STranslateContext* pCxt, SNode** pTable) {
|
||||
pCxt->currClause = SQL_CLAUSE_FROM;
|
||||
return translateTable(pCxt, pTable);
|
||||
}
|
||||
|
@ -3918,7 +3906,7 @@ static int32_t translateSelectWithoutFrom(STranslateContext* pCxt, SSelectStmt*
|
|||
|
||||
static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
pCxt->pCurrStmt = (SNode*)pSelect;
|
||||
int32_t code = translateFrom(pCxt, pSelect->pFromTable);
|
||||
int32_t code = translateFrom(pCxt, &pSelect->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pSelect->precision = ((STableNode*)pSelect->pFromTable)->precision;
|
||||
code = translateWhere(pCxt, pSelect);
|
||||
|
@ -4141,7 +4129,7 @@ static int32_t translateDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet
|
|||
|
||||
static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
|
||||
pCxt->pCurrStmt = (SNode*)pDelete;
|
||||
int32_t code = translateFrom(pCxt, pDelete->pFromTable);
|
||||
int32_t code = translateFrom(pCxt, &pDelete->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDelete->precision = ((STableNode*)pDelete->pFromTable)->precision;
|
||||
code = translateDeleteWhere(pCxt, pDelete);
|
||||
|
@ -4234,10 +4222,10 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
|
|||
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
|
||||
}
|
||||
|
||||
static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
static int32_t translateInsertTable(STranslateContext* pCxt, SNode** pTable) {
|
||||
int32_t code = translateFrom(pCxt, pTable);
|
||||
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType) {
|
||||
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType &&
|
||||
TSDB_NORMAL_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType) {
|
||||
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||
"insert data into super table is not supported");
|
||||
}
|
||||
|
@ -4246,7 +4234,7 @@ static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
|
||||
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||
pCxt->pCurrStmt = (SNode*)pInsert;
|
||||
int32_t code = translateInsertTable(pCxt, pInsert->pTable);
|
||||
int32_t code = translateInsertTable(pCxt, &pInsert->pTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateInsertCols(pCxt, pInsert);
|
||||
}
|
||||
|
@ -6247,7 +6235,7 @@ static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt
|
|||
.mgmtEps = pParCxt->mgmtEpSet};
|
||||
SName name;
|
||||
STableMeta* pMeta = NULL;
|
||||
int32_t code = getTableMetaImpl(pCxt, toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name), &pMeta);
|
||||
int32_t code = getTableMetaImpl(pCxt, toName(pParCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name), &pMeta, false);
|
||||
if (code) {
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
|
@ -7278,11 +7266,15 @@ static int32_t validateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStm
|
|||
}
|
||||
|
||||
static int32_t translateCreateView(STranslateContext* pCxt, SCreateViewStmt* pStmt) {
|
||||
SParseSqlRes res = {.resType = PARSE_SQL_RES_SCHEMA};
|
||||
int32_t code = validateCreateView(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = (*pCxt->pParseCxt->validateSqlFp)(pCxt->pParseCxt->validateSqlParam, pStmt->pQuerySql, &pStmt->createReq);
|
||||
code = (*pCxt->pParseCxt->parseSqlFp)(pCxt->pParseCxt->parseSqlParam, pStmt->pQuerySql, false, &res);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pStmt->createReq.precision = res.schemaRes.precision;
|
||||
pStmt->createReq.numOfCols = res.schemaRes.numOfCols;
|
||||
pStmt->createReq.pSchema = res.schemaRes.pSchema;
|
||||
strncpy(pStmt->createReq.name, pStmt->viewName, sizeof(pStmt->createReq.name) - 1);
|
||||
snprintf(pStmt->createReq.dbFName, sizeof(pStmt->createReq.dbFName) - 1, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName);
|
||||
snprintf(pStmt->createReq.fullname, sizeof(pStmt->createReq.fullname) - 1, "%s.%s", pStmt->createReq.dbFName, pStmt->viewName);
|
||||
|
@ -7418,7 +7410,7 @@ static int32_t translateGrantTagCond(STranslateContext* pCxt, SGrantStmt* pStmt,
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SName name;
|
||||
code = getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pTable->table.dbName, pTable->table.tableName, &name),
|
||||
&(pTable->pMeta));
|
||||
&(pTable->pMeta), false);
|
||||
if (code) {
|
||||
nodesDestroyNode((SNode*)pTable);
|
||||
return code;
|
||||
|
@ -8810,7 +8802,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
|
|||
SName name;
|
||||
toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &name);
|
||||
STableMeta* pTableMeta = NULL;
|
||||
int32_t code = getTableMetaImpl(pCxt, &name, &pTableMeta);
|
||||
int32_t code = getTableMetaImpl(pCxt, &name, &pTableMeta, false);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(&name, pCxt->pTargetTables);
|
||||
}
|
||||
|
|
|
@ -719,7 +719,7 @@ SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode*
|
|||
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj** pHash) {
|
||||
if (NULL == *pHash) {
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
*pHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == *pHash) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -728,7 +728,7 @@ static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pD
|
|||
return taosHashPut(*pHash, pKey, len, &pRes, POINTER_BYTES);
|
||||
}
|
||||
|
||||
static int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput) {
|
||||
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput) {
|
||||
SMetaRes** pRes = taosHashGet(pHash, pKey, len);
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
|
@ -902,6 +902,23 @@ int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, S
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getViewMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pName, fullName);
|
||||
SViewMeta* pViewMeta = NULL;
|
||||
int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pViews, (void**)&pViewMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pMeta = taosMemoryCalloc(1, sizeof(STableMeta));
|
||||
if (NULL == *pMeta) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*pMeta)->uid = pViewMeta->viewId;
|
||||
(*pMeta)->tableType = TSDB_VIEW_TABLE;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** pDbs) {
|
||||
if (NULL == *pDbs) {
|
||||
*pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
|
|
@ -51,6 +51,10 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
|
|||
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
|
||||
int32_t code = authenticate(pCxt, pQuery, pMetaCache);
|
||||
|
||||
if (pCxt->parseOnly) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && pQuery->placeholderNum > 0) {
|
||||
TSWAP(pQuery->pPrepareRoot, pQuery->pRoot);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -764,7 +764,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
||||
SCH_ERR_RET(TSDB_CODE_MND_INVALID_SCHEMA_VER);
|
||||
SCH_ERR_RET(TSDB_CODE_PLAN_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
|
||||
|
|
Loading…
Reference in New Issue