Merge pull request #13168 from taosdata/feature/3.0_async

feat: parser adapts asynchronous interface
This commit is contained in:
Xiaoyu Wang 2022-05-28 20:47:29 +08:00 committed by GitHub
commit 900aa793bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1323 additions and 294 deletions

View File

@ -23,6 +23,9 @@ extern "C" {
#include "function.h"
#include "querynodes.h"
#define FUNC_AGGREGATE_UDF_ID 5001
#define FUNC_SCALAR_UDF_ID 5002
typedef enum EFunctionType {
// aggregate function
FUNCTION_TYPE_APERCENTILE = 1,
@ -126,21 +129,12 @@ typedef enum EFunctionType {
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
struct STimeWindow;
struct SCatalog;
typedef struct SFmGetFuncInfoParam {
struct SCatalog* pCtg;
void* pRpc;
const SEpSet* pMgmtEps;
char* pErrBuf;
int32_t errBufLen;
} SFmGetFuncInfoParam;
int32_t fmFuncMgtInit();
void fmFuncMgtDestroy();
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen);
bool fmIsBuiltinFunc(const char* pFunc);

View File

@ -655,7 +655,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
//udf
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)

View File

@ -14,7 +14,7 @@ target_include_directories(
target_link_libraries(
function
PRIVATE os util common nodes scalar catalog qcom transport
PRIVATE os util common nodes scalar qcom transport
PUBLIC uv_a
)

View File

@ -44,9 +44,7 @@ extern "C" {
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define FUNC_UDF_ID_START 5000
#define FUNC_AGGREGATE_UDF_ID 5001
#define FUNC_SCALAR_UDF_ID 5002
#define FUNC_UDF_ID_START 5000
extern const int funcMgtUdfNum;

View File

@ -16,7 +16,6 @@
#include "functionMgt.h"
#include "builtins.h"
#include "catalog.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
@ -65,35 +64,19 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
}
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
SFuncInfo funcInfo = {0};
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = funcInfo.outputType;
pFunc->node.resType.bytes = funcInfo.outputLen;
pFunc->udfBufSize = funcInfo.bufSize;
tFreeSFuncInfo(&funcInfo);
return TSDB_CODE_SUCCESS;
}
int32_t fmFuncMgtInit() {
taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
return initFunctionCode;
}
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
if (NULL != pVal) {
pFunc->funcId = *(int32_t*)pVal;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pParam->pErrBuf, pParam->errBufLen);
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
}
return getUdfInfo(pParam, pFunc);
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
bool fmIsBuiltinFunc(const char* pFunc) {

View File

@ -2555,7 +2555,7 @@ static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkSessionWindowGap = "Gap";
static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
const SSessionWindowNode * pNode = (const SSessionWindowNode*)pObj;
const SSessionWindowNode* pNode = (const SSessionWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkSessionWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
@ -2567,9 +2567,9 @@ static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
SSessionWindowNode* pNode = (SSessionWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode **)&pNode->pCol);
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode**)&pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode **)&pNode->pGap);
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode**)&pNode->pGap);
}
return code;
}
@ -2796,6 +2796,150 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkDatabaseOptionsBuffer = "Buffer";
static const char* jkDatabaseOptionsCachelast = "Cachelast";
static const char* jkDatabaseOptionsCompressionLevel = "CompressionLevel";
static const char* jkDatabaseOptionsDaysPerFileNode = "DaysPerFileNode";
static const char* jkDatabaseOptionsDaysPerFile = "DaysPerFile";
static const char* jkDatabaseOptionsFsyncPeriod = "FsyncPeriod";
static const char* jkDatabaseOptionsMaxRowsPerBlock = "MaxRowsPerBlock";
static const char* jkDatabaseOptionsMinRowsPerBlock = "MinRowsPerBlock";
static const char* jkDatabaseOptionsKeep = "Keep";
static const char* jkDatabaseOptionsPages = "Pages";
static const char* jkDatabaseOptionsPagesize = "Pagesize";
static const char* jkDatabaseOptionsPrecision = "Precision";
static const char* jkDatabaseOptionsReplica = "Replica";
static const char* jkDatabaseOptionsStrict = "Strict";
static const char* jkDatabaseOptionsWalLevel = "WalLevel";
static const char* jkDatabaseOptionsNumOfVgroups = "NumOfVgroups";
static const char* jkDatabaseOptionsSingleStable = "SingleStable";
static const char* jkDatabaseOptionsRetentions = "Retentions";
static const char* jkDatabaseOptionsSchemaless = "Schemaless";
static int32_t databaseOptionsToJson(const void* pObj, SJson* pJson) {
const SDatabaseOptions* pNode = (const SDatabaseOptions*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsBuffer, pNode->buffer);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCachelast, pNode->cachelast);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCompressionLevel, pNode->compressionLevel);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDatabaseOptionsDaysPerFileNode, nodeToJson, pNode->pDaysPerFile);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsDaysPerFile, pNode->daysPerFile);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsFsyncPeriod, pNode->fsyncPeriod);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsMaxRowsPerBlock, pNode->maxRowsPerBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsMinRowsPerBlock, pNode->minRowsPerBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkDatabaseOptionsKeep, pNode->pKeep);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsPages, pNode->pages);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsPagesize, pNode->pagesize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkDatabaseOptionsPrecision, pNode->precisionStr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsReplica, pNode->replica);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsStrict, pNode->strict);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsWalLevel, pNode->walLevel);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsNumOfVgroups, pNode->numOfVgroups);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsSingleStable, pNode->singleStable);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkDatabaseOptionsRetentions, pNode->pRetentions);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsSchemaless, pNode->schemaless);
}
return code;
}
static int32_t jsonToDatabaseOptions(const SJson* pJson, void* pObj) {
SDatabaseOptions* pNode = (SDatabaseOptions*)pObj;
int32_t code = tjsonGetIntValue(pJson, jkDatabaseOptionsBuffer, &pNode->buffer);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCachelast, &pNode->cachelast);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCompressionLevel, &pNode->compressionLevel);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDatabaseOptionsDaysPerFileNode, (SNode**)&pNode->pDaysPerFile);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsDaysPerFile, &pNode->daysPerFile);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsFsyncPeriod, &pNode->fsyncPeriod);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsMaxRowsPerBlock, &pNode->maxRowsPerBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsMinRowsPerBlock, &pNode->minRowsPerBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkDatabaseOptionsKeep, &pNode->pKeep);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsPages, &pNode->pages);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsPagesize, &pNode->pagesize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkDatabaseOptionsPrecision, pNode->precisionStr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsReplica, &pNode->replica);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsStrict, &pNode->strict);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsWalLevel, &pNode->walLevel);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDatabaseOptionsNumOfVgroups, &pNode->numOfVgroups);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsSingleStable, &pNode->singleStable);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkDatabaseOptionsRetentions, &pNode->pRetentions);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsSchemaless, &pNode->schemaless);
}
return code;
}
static const char* jkDataBlockDescDataBlockId = "DataBlockId";
static const char* jkDataBlockDescSlots = "Slots";
static const char* jkDataBlockTotalRowSize = "TotalRowSize";
@ -2998,6 +3142,130 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkAlterDatabaseStmtDbName = "DbName";
static const char* jkAlterDatabaseStmtOptions = "Options";
static int32_t alterDatabaseStmtToJson(const void* pObj, SJson* pJson) {
const SAlterDatabaseStmt* pNode = (const SAlterDatabaseStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkAlterDatabaseStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAlterDatabaseStmtOptions, nodeToJson, pNode->pOptions);
}
return code;
}
static int32_t jsonToAlterDatabaseStmt(const SJson* pJson, void* pObj) {
SAlterDatabaseStmt* pNode = (SAlterDatabaseStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkAlterDatabaseStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkAlterDatabaseStmtOptions, (SNode**)&pNode->pOptions);
}
return code;
}
static const char* jkAlterTableStmtDbName = "DbName";
static const char* jkAlterTableStmtTableName = "TableName";
static const char* jkAlterTableStmtAlterType = "AlterType";
static const char* jkAlterTableStmtColName = "ColName";
static const char* jkAlterTableStmtNewColName = "NewColName";
static const char* jkAlterTableStmtOptions = "Options";
static const char* jkAlterTableStmtNewDataType = "NewDataType";
static const char* jkAlterTableStmtNewTagVal = "NewTagVal";
static int32_t alterTableStmtToJson(const void* pObj, SJson* pJson) {
const SAlterTableStmt* pNode = (const SAlterTableStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkAlterTableStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterTableStmtTableName, pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkAlterTableStmtAlterType, pNode->alterType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterTableStmtColName, pNode->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterTableStmtNewColName, pNode->newColName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAlterTableStmtOptions, nodeToJson, pNode->pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAlterTableStmtNewDataType, dataTypeToJson, &pNode->dataType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAlterTableStmtOptions, nodeToJson, pNode->pVal);
}
return code;
}
static int32_t jsonToAlterTableStmt(const SJson* pJson, void* pObj) {
SAlterTableStmt* pNode = (SAlterTableStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkAlterTableStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterTableStmtTableName, pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkAlterTableStmtAlterType, &pNode->alterType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterTableStmtColName, pNode->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterTableStmtNewColName, pNode->newColName);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkAlterTableStmtOptions, (SNode**)&pNode->pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkAlterTableStmtNewDataType, jsonToDataType, &pNode->dataType);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkAlterTableStmtOptions, (SNode**)&pNode->pVal);
}
return code;
}
static const char* jkAlterDnodeStmtDnodeId = "DnodeId";
static const char* jkAlterDnodeStmtConfig = "Config";
static const char* jkAlterDnodeStmtValue = "Value";
static int32_t alterDnodeStmtToJson(const void* pObj, SJson* pJson) {
const SAlterDnodeStmt* pNode = (const SAlterDnodeStmt*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkAlterDnodeStmtDnodeId, pNode->dnodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterDnodeStmtConfig, pNode->config);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAlterDnodeStmtValue, pNode->value);
}
return code;
}
static int32_t jsonToAlterDnodeStmt(const SJson* pJson, void* pObj) {
SAlterDnodeStmt* pNode = (SAlterDnodeStmt*)pObj;
int32_t code = tjsonGetIntValue(pJson, jkAlterDnodeStmtDnodeId, &pNode->dnodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterDnodeStmtConfig, pNode->config);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAlterDnodeStmtValue, pNode->value);
}
return code;
}
static const char* jkCreateTopicStmtTopicName = "TopicName";
static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName";
static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists";
@ -3082,6 +3350,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break;
case QUERY_NODE_DOWNSTREAM_SOURCE:
return downstreamSourceNodeToJson(pObj, pJson);
case QUERY_NODE_DATABASE_OPTIONS:
return databaseOptionsToJson(pObj, pJson);
case QUERY_NODE_LEFT_VALUE:
return TSDB_CODE_SUCCESS; // SLeftValueNode has no fields to serialize.
case QUERY_NODE_SET_OPERATOR:
@ -3090,8 +3360,17 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return selectStmtToJson(pObj, pJson);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
break;
case QUERY_NODE_ALTER_DATABASE_STMT:
return alterDatabaseStmtToJson(pObj, pJson);
case QUERY_NODE_CREATE_TABLE_STMT:
break;
case QUERY_NODE_ALTER_TABLE_STMT:
return alterTableStmtToJson(pObj, pJson);
case QUERY_NODE_USE_DATABASE_STMT:
break;
case QUERY_NODE_ALTER_DNODE_STMT:
return alterDnodeStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
break;
@ -3198,12 +3477,20 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToSlotDescNode(pJson, pObj);
case QUERY_NODE_DOWNSTREAM_SOURCE:
return jsonToDownstreamSourceNode(pJson, pObj);
case QUERY_NODE_DATABASE_OPTIONS:
return jsonToDatabaseOptions(pJson, pObj);
case QUERY_NODE_LEFT_VALUE:
return TSDB_CODE_SUCCESS; // SLeftValueNode has no fields to deserialize.
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
return jsonToSelectStmt(pJson, pObj);
case QUERY_NODE_ALTER_DATABASE_STMT:
return jsonToAlterDatabaseStmt(pJson, pObj);
case QUERY_NODE_ALTER_TABLE_STMT:
return jsonToAlterTableStmt(pJson, pObj);
case QUERY_NODE_ALTER_DNODE_STMT:
return jsonToAlterDnodeStmt(pJson, pObj);
case QUERY_NODE_CREATE_TOPIC_STMT:
return jsonToCreateTopicStmt(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SCAN:

View File

@ -27,14 +27,13 @@ extern "C" {
#include "querynodes.h"
typedef struct SAstCreateContext {
SParseContext* pQueryCxt;
SMsgBuf msgBuf;
bool notSupport;
SNode* pRootNode;
int16_t placeholderNo;
SArray* pPlaceholderValues;
int32_t errCode;
SParseMetaCache* pMetaCache;
SParseContext* pQueryCxt;
SMsgBuf msgBuf;
bool notSupport;
SNode* pRootNode;
int16_t placeholderNo;
SArray* pPlaceholderValues;
int32_t errCode;
} SAstCreateContext;
typedef enum EDatabaseOptionType {
@ -75,7 +74,7 @@ typedef struct SAlterOption {
extern SToken nil_token;
int32_t initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt);
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt);
SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode);
SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const SToken* pEnd, SNode* pNode);

View File

@ -26,6 +26,7 @@ extern "C" {
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery);
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);

View File

@ -24,12 +24,12 @@ extern "C" {
#include "os.h"
#include "query.h"
#define parserFatal(param, ...) qFatal("PARSER: " param, __VA_ARGS__)
#define parserError(param, ...) qError("PARSER: " param, __VA_ARGS__)
#define parserWarn(param, ...) qWarn("PARSER: " param, __VA_ARGS__)
#define parserInfo(param, ...) qInfo("PARSER: " param, __VA_ARGS__)
#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__)
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
#define parserError(param, ...) qError("PARSER: " param, ##__VA_ARGS__)
#define parserWarn(param, ...) qWarn("PARSER: " param, ##__VA_ARGS__)
#define parserInfo(param, ...) qInfo("PARSER: " param, ##__VA_ARGS__)
#define parserDebug(param, ...) qDebug("PARSER: " param, ##__VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, ##__VA_ARGS__)
#define PK_TS_COL_INTERNAL_NAME "_rowts"
@ -42,7 +42,10 @@ typedef struct SParseMetaCache {
SHashObj* pTableMeta; // key is tbFName, element is STableMeta*
SHashObj* pDbVgroup; // key is dbFName, element is SArray<SVgroupInfo>*
SHashObj* pTableVgroup; // key is tbFName, element is SVgroupInfo*
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
@ -62,12 +65,22 @@ int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache);
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDBVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableHashVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
int32_t getDBVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum);
int32_t getDBCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type,
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
#ifdef __cplusplus
}

View File

@ -38,7 +38,7 @@
SToken nil_token = {.type = TK_NK_NIL, .n = 0, .z = NULL};
int32_t initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
memset(pCxt, 0, sizeof(SAstCreateContext));
pCxt->pQueryCxt = pParseCxt;
pCxt->msgBuf.buf = pParseCxt->pMsg;
@ -48,13 +48,6 @@ int32_t initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt)
pCxt->placeholderNo = 0;
pCxt->pPlaceholderValues = NULL;
pCxt->errCode = TSDB_CODE_SUCCESS;
if (pParseCxt->async) {
pCxt->pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache));
if (NULL == pCxt->pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static void copyStringFormStringToken(SToken* pToken, char* pBuf, int32_t len) {
@ -472,13 +465,6 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTa
strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n);
}
strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
if (NULL != pCxt->pMetaCache) {
if (TSDB_CODE_SUCCESS != reserveTableMetaInCache(pCxt->pQueryCxt->acctId, realTable->table.dbName,
realTable->table.tableName, pCxt->pMetaCache)) {
nodesDestroyNode(realTable);
CHECK_OUT_OF_MEM(NULL);
}
}
return (SNode*)realTable;
}

View File

@ -13,11 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "os.h"
#include "parInt.h"
#include "parAst.h"
#include "parInt.h"
#include "parToken.h"
#include "systable.h"
typedef void* (*FMalloc)(size_t);
typedef void (*FFree)(void*);
@ -82,8 +83,386 @@ abort_parse:
(*pQuery)->pRoot = cxt.pRootNode;
(*pQuery)->placeholderNum = cxt.placeholderNo;
TSWAP((*pQuery)->pPlaceholderValues, cxt.pPlaceholderValues);
TSWAP((*pQuery)->pMetaCache, cxt.pMetaCache);
}
taosArrayDestroy(cxt.pPlaceholderValues);
return cxt.errCode;
}
typedef struct SCollectMetaKeyCxt {
SParseContext* pParseCxt;
SParseMetaCache* pMetaCache;
} SCollectMetaKeyCxt;
static void destroyCollectMetaKeyCxt(SCollectMetaKeyCxt* pCxt) {
if (NULL != pCxt->pMetaCache) {
// TODO
}
}
typedef struct SCollectMetaKeyFromExprCxt {
SCollectMetaKeyCxt* pComCxt;
int32_t errCode;
} SCollectMetaKeyFromExprCxt;
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt);
static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFunctionNode* pFunc) {
if (fmIsBuiltinFunc(pFunc->functionName)) {
return TSDB_CODE_SUCCESS;
}
return reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
}
static EDealRes collectMetaKeyFromRealTable(SCollectMetaKeyFromExprCxt* pCxt, SRealTableNode* pRealTable) {
pCxt->errCode = reserveTableMetaInCache(pCxt->pComCxt->pParseCxt->acctId, pRealTable->table.dbName,
pRealTable->table.tableName, pCxt->pComCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = reserveTableVgroupInCache(pCxt->pComCxt->pParseCxt->acctId, pRealTable->table.dbName,
pRealTable->table.tableName, pCxt->pComCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = reserveUserAuthInCache(pCxt->pComCxt->pParseCxt->acctId, pCxt->pComCxt->pParseCxt->pUser,
pRealTable->table.dbName, AUTH_TYPE_READ, pCxt->pComCxt->pMetaCache);
}
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
}
static EDealRes collectMetaKeyFromTempTable(SCollectMetaKeyFromExprCxt* pCxt, STempTableNode* pTempTable) {
pCxt->errCode = collectMetaKeyFromQuery(pCxt->pComCxt, pTempTable->pSubquery);
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
}
static EDealRes collectMetaKeyFromExprImpl(SNode* pNode, void* pContext) {
SCollectMetaKeyFromExprCxt* pCxt = pContext;
switch (nodeType(pNode)) {
case QUERY_NODE_FUNCTION:
return collectMetaKeyFromFunction(pCxt, (SFunctionNode*)pNode);
case QUERY_NODE_REAL_TABLE:
return collectMetaKeyFromRealTable(pCxt, (SRealTableNode*)pNode);
case QUERY_NODE_TEMP_TABLE:
return collectMetaKeyFromTempTable(pCxt, (STempTableNode*)pNode);
default:
break;
}
return DEAL_RES_CONTINUE;
}
static int32_t collectMetaKeyFromExprs(SCollectMetaKeyCxt* pCxt, SNodeList* pList) {
SCollectMetaKeyFromExprCxt cxt = {.pComCxt = pCxt, .errCode = TSDB_CODE_SUCCESS};
nodesWalkExprs(pList, collectMetaKeyFromExprImpl, &cxt);
return cxt.errCode;
}
static int32_t collectMetaKeyFromSetOperator(SCollectMetaKeyCxt* pCxt, SSetOperator* pStmt) {
int32_t code = collectMetaKeyFromQuery(pCxt, pStmt->pLeft);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromExprs(pCxt, pStmt->pOrderByList);
}
return code;
}
static int32_t collectMetaKeyFromSelect(SCollectMetaKeyCxt* pCxt, SSelectStmt* pStmt) {
SCollectMetaKeyFromExprCxt cxt = {.pComCxt = pCxt, .errCode = TSDB_CODE_SUCCESS};
nodesWalkSelectStmt(pStmt, SQL_CLAUSE_FROM, collectMetaKeyFromExprImpl, &cxt);
return cxt.errCode;
}
static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) {
if (NULL == pStmt->pTags) {
return reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
} else {
return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
}
static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCreateMultiTableStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pStmt->pSubTables) {
SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode;
code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromUseDatabase(SCollectMetaKeyCxt* pCxt, SUseDatabaseStmt* pStmt) {
return reserveDbVgVersionInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIndexStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
if (INDEX_TYPE_SMA == pStmt->indexType) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code =
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
}
}
return code;
}
static int32_t collectMetaKeyFromCreateTopic(SCollectMetaKeyCxt* pCxt, SCreateTopicStmt* pStmt) {
if (NULL != pStmt->pQuery) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromExplain(SCollectMetaKeyCxt* pCxt, SExplainStmt* pStmt) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) {
return collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
static int32_t collectMetaKeyFromShowDnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowMnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MNODES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowModules(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MODULES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowQnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_QNODES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_SNODES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowDatabases(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_DATABASES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowFunctions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_FUNCTIONS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowIndexes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_INDEXES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowStables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_STABLES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowStreams(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_STREAMS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_USER_TABLES, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pDbName) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
} else {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, pCxt->pMetaCache);
}
}
return code;
}
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowLicence(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_LICENCES,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowVgroups(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TOPICS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt);
case QUERY_NODE_SELECT_STMT:
return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_DROP_DATABASE_STMT:
case QUERY_NODE_ALTER_DATABASE_STMT:
break;
case QUERY_NODE_CREATE_TABLE_STMT:
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
break;
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
case QUERY_NODE_DROP_TABLE_CLAUSE:
case QUERY_NODE_DROP_TABLE_STMT:
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
break;
case QUERY_NODE_ALTER_TABLE_STMT:
return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_CREATE_USER_STMT:
case QUERY_NODE_ALTER_USER_STMT:
case QUERY_NODE_DROP_USER_STMT:
break;
case QUERY_NODE_USE_DATABASE_STMT:
return collectMetaKeyFromUseDatabase(pCxt, (SUseDatabaseStmt*)pStmt);
case QUERY_NODE_CREATE_DNODE_STMT:
case QUERY_NODE_DROP_DNODE_STMT:
case QUERY_NODE_ALTER_DNODE_STMT:
break;
case QUERY_NODE_CREATE_INDEX_STMT:
return collectMetaKeyFromCreateIndex(pCxt, (SCreateIndexStmt*)pStmt);
case QUERY_NODE_DROP_INDEX_STMT:
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
break;
case QUERY_NODE_CREATE_TOPIC_STMT:
return collectMetaKeyFromCreateTopic(pCxt, (SCreateTopicStmt*)pStmt);
case QUERY_NODE_DROP_TOPIC_STMT:
case QUERY_NODE_DROP_CGROUP_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
break;
case QUERY_NODE_EXPLAIN_STMT:
return collectMetaKeyFromExplain(pCxt, (SExplainStmt*)pStmt);
case QUERY_NODE_DESCRIBE_STMT:
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_COMPACT_STMT:
case QUERY_NODE_CREATE_FUNCTION_STMT:
case QUERY_NODE_DROP_FUNCTION_STMT:
break;
case QUERY_NODE_CREATE_STREAM_STMT:
return collectMetaKeyFromCreateStream(pCxt, (SCreateStreamStmt*)pStmt);
case QUERY_NODE_DROP_STREAM_STMT:
case QUERY_NODE_MERGE_VGROUP_STMT:
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
case QUERY_NODE_SPLIT_VGROUP_STMT:
case QUERY_NODE_SYNCDB_STMT:
case QUERY_NODE_GRANT_STMT:
case QUERY_NODE_REVOKE_STMT:
case QUERY_NODE_SHOW_DNODES_STMT:
return collectMetaKeyFromShowDnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_MNODES_STMT:
return collectMetaKeyFromShowMnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_MODULES_STMT:
return collectMetaKeyFromShowModules(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_QNODES_STMT:
return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_SNODES_STMT:
return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_BNODES_STMT:
return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CLUSTER_STMT:
break;
case QUERY_NODE_SHOW_DATABASES_STMT:
return collectMetaKeyFromShowDatabases(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
return collectMetaKeyFromShowFunctions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_INDEXES_STMT:
return collectMetaKeyFromShowIndexes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_STABLES_STMT:
return collectMetaKeyFromShowStables(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_STREAMS_STMT:
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TABLES_STMT:
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_USERS_STMT:
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_LICENCE_STMT:
return collectMetaKeyFromShowLicence(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_VGROUPS_STMT:
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TOPICS_STMT:
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
break;
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_QUERY_STMT:
case QUERY_NODE_KILL_TRANSACTION_STMT:
default:
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery) {
SCollectMetaKeyCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
if (NULL == cxt.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = collectMetaKeyFromQuery(&cxt, pQuery->pRoot);
if (TSDB_CODE_SUCCESS == code) {
TSWAP(pQuery->pMetaCache, cxt.pMetaCache);
}
destroyCollectMetaKeyCxt(&cxt);
return code;
}

View File

@ -18,23 +18,30 @@
#include "parInt.h"
typedef struct SAuthCxt {
SParseContext* pParseCxt;
int32_t errCode;
SParseContext* pParseCxt;
SParseMetaCache* pMetaCache;
int32_t errCode;
} SAuthCxt;
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt);
static int32_t checkAuth(SParseContext* pCxt, const char* pDbName, AUTH_TYPE type) {
if (pCxt->isSuperUser) {
static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) {
SParseContext* pParseCxt = pCxt->pParseCxt;
if (pParseCxt->isSuperUser) {
return TSDB_CODE_SUCCESS;
}
SName name;
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName));
tNameSetDbName(&name, pParseCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
int32_t code = TSDB_CODE_SUCCESS;
bool pass = false;
int32_t code =
catalogChkAuth(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pCxt->pUser, dbFname, type, &pass);
if (NULL != pCxt->pMetaCache) {
code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass);
} else {
code = catalogChkAuth(pParseCxt->pCatalog, pParseCxt->pTransporter, &pParseCxt->mgmtEpSet, pParseCxt->pUser,
dbFname, type, &pass);
}
return TSDB_CODE_SUCCESS == code ? (pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED) : code;
}
@ -45,7 +52,7 @@ static EDealRes authSubquery(SAuthCxt* pCxt, SNode* pStmt) {
static EDealRes authSelectImpl(SNode* pNode, void* pContext) {
SAuthCxt* pCxt = pContext;
if (QUERY_NODE_REAL_TABLE == nodeType(pNode)) {
pCxt->errCode = checkAuth(pCxt->pParseCxt, ((SRealTableNode*)pNode)->table.dbName, AUTH_TYPE_READ);
pCxt->errCode = checkAuth(pCxt, ((SRealTableNode*)pNode)->table.dbName, AUTH_TYPE_READ);
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
} else if (QUERY_NODE_TEMP_TABLE == nodeType(pNode)) {
return authSubquery(pCxt, ((STempTableNode*)pNode)->pSubquery);
@ -79,87 +86,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return authSetOperator(pCxt, (SSetOperator*)pStmt);
case QUERY_NODE_SELECT_STMT:
return authSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_CREATE_DATABASE_STMT:
case QUERY_NODE_DROP_DATABASE_STMT:
case QUERY_NODE_ALTER_DATABASE_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
case QUERY_NODE_DROP_TABLE_CLAUSE:
case QUERY_NODE_DROP_TABLE_STMT:
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
case QUERY_NODE_ALTER_TABLE_STMT:
case QUERY_NODE_CREATE_USER_STMT:
case QUERY_NODE_ALTER_USER_STMT:
break;
case QUERY_NODE_DROP_USER_STMT: {
case QUERY_NODE_DROP_USER_STMT:
return authDropUser(pCxt, (SDropUserStmt*)pStmt);
}
case QUERY_NODE_USE_DATABASE_STMT:
case QUERY_NODE_CREATE_DNODE_STMT:
case QUERY_NODE_DROP_DNODE_STMT:
case QUERY_NODE_ALTER_DNODE_STMT:
case QUERY_NODE_CREATE_INDEX_STMT:
case QUERY_NODE_DROP_INDEX_STMT:
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
case QUERY_NODE_CREATE_TOPIC_STMT:
case QUERY_NODE_DROP_TOPIC_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
case QUERY_NODE_EXPLAIN_STMT:
case QUERY_NODE_DESCRIBE_STMT:
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_COMPACT_STMT:
case QUERY_NODE_CREATE_FUNCTION_STMT:
case QUERY_NODE_DROP_FUNCTION_STMT:
case QUERY_NODE_CREATE_STREAM_STMT:
case QUERY_NODE_DROP_STREAM_STMT:
case QUERY_NODE_MERGE_VGROUP_STMT:
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
case QUERY_NODE_SPLIT_VGROUP_STMT:
case QUERY_NODE_SYNCDB_STMT:
case QUERY_NODE_GRANT_STMT:
case QUERY_NODE_REVOKE_STMT:
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
case QUERY_NODE_SHOW_INDEXES_STMT:
case QUERY_NODE_SHOW_STABLES_STMT:
case QUERY_NODE_SHOW_STREAMS_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
case QUERY_NODE_SHOW_USERS_STMT:
case QUERY_NODE_SHOW_LICENCE_STMT:
case QUERY_NODE_SHOW_VGROUPS_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_CONSUMERS_STMT:
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
case QUERY_NODE_SHOW_SMAS_STMT:
case QUERY_NODE_SHOW_CONFIGS_STMT:
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_QUERY_STMT:
case QUERY_NODE_KILL_TRANSACTION_STMT:
default:
break;
}
@ -168,6 +96,6 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
}
int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery) {
SAuthCxt cxt = {.pParseCxt = pParseCxt, .errCode = TSDB_CODE_SUCCESS};
SAuthCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = pQuery->pMetaCache, .errCode = TSDB_CODE_SUCCESS};
return authQuery(&cxt, pQuery->pRoot);
}

View File

@ -152,7 +152,7 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
tNameGetFullDbName(pName, fullDbName);
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getDBVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
code = getDbVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
} else {
code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
@ -177,7 +177,7 @@ static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pNam
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getTableHashVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
code = getTableVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
} else {
code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
@ -205,7 +205,7 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getDBVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
code = getDbVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
} else {
code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
@ -226,7 +226,7 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
tNameGetFullDbName(&name, dbFname);
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getDBCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
code = getDbCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
} else {
code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
@ -239,6 +239,27 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
return code;
}
static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
SParseContext* pParCxt = pCxt->pParseCxt;
SFuncInfo funcInfo = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getUdfInfoFromCache(pCxt->pMetaCache, pFunc->functionName, &funcInfo);
} else {
code = catalogGetUdfInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pFunc->functionName,
&funcInfo);
}
if (TSDB_CODE_SUCCESS == code) {
pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = funcInfo.outputType;
pFunc->node.resType.bytes = funcInfo.outputLen;
pFunc->udfBufSize = funcInfo.bufSize;
tFreeSFuncInfo(&funcInfo);
}
return code;
}
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
@ -873,12 +894,11 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) {
}
static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
.pErrBuf = pCxt->msgBuf.buf,
.errBufLen = pCxt->msgBuf.len};
return fmGetFuncInfo(&param, pFunc);
int32_t code = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
if (TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION == code) {
code = getUdfInfo(pCxt, pFunc);
}
return code;
}
static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
@ -1212,7 +1232,6 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
if ('\0' != pRealTable->qualDbName[0]) {
// todo release after mnode can be processed
if (0 != strcmp(pRealTable->qualDbName, TSDB_INFORMATION_SCHEMA_DB)) {
code = getDBVgInfo(pCxt, pRealTable->qualDbName, &vgroupList);
}
@ -1220,7 +1239,6 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
}
// todo release after mnode can be processed
if (TSDB_CODE_SUCCESS == code) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
}

View File

@ -15,6 +15,9 @@
#include "parUtil.h"
#include "cJSON.h"
#include "querynodes.h"
#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
static char* getSyntaxErrFormat(int32_t errCode) {
switch (errCode) {
@ -255,17 +258,8 @@ STableComInfo getTableInfo(const STableMeta* pTableMeta) {
return pTableMeta->tableInfo;
}
static uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
int32_t totalCols = 0;
if (pTableMeta->tableInfo.numOfColumns >= 0) {
totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
}
return sizeof(STableMeta) + totalCols * sizeof(SSchema);
}
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
size_t size = getTableMetaSize(pTableMeta);
size_t size = TABLE_META_SIZE(pTableMeta);
STableMeta* p = taosMemoryMalloc(size);
memcpy(p, pTableMeta, size);
@ -449,6 +443,26 @@ end:
return retCode;
}
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) {
return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type);
}
static int32_t userAuthToStringExt(const char* pUser, const char* pDbFName, AUTH_TYPE type, char* pStr) {
return sprintf(pStr, "%s*%s*%d", pUser, pDbFName, type);
}
static void stringToUserAuth(const char* pStr, int32_t len, SUserAuthInfo* pUserAuth) {
char* p1 = strchr(pStr, '*');
strncpy(pUserAuth->user, pStr, p1 - pStr);
++p1;
char* p2 = strchr(p1, '*');
strncpy(pUserAuth->dbFName, p1, p2 - p1);
++p2;
char buf[10] = {0};
strncpy(buf, p2, len - (p2 - pStr));
pUserAuth->type = taosStr2Int32(buf, NULL, 10);
}
static int32_t buildTableReq(SHashObj* pTablesHash, SArray** pTables) {
if (NULL != pTablesHash) {
*pTables = taosArrayInit(taosHashGetSize(pTablesHash), sizeof(SName));
@ -503,6 +517,44 @@ static int32_t buildTableVgroupReq(SHashObj* pTableVgroupHash, SArray** pTableVg
static int32_t buildDbCfgReq(SHashObj* pDbCfgHash, SArray** pDbCfg) { return buildDbReq(pDbCfgHash, pDbCfg); }
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
if (NULL != pUserAuthHash) {
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
if (NULL == *pUserAuth) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* p = taosHashIterate(pUserAuthHash, NULL);
while (NULL != p) {
size_t len = 0;
char* pKey = taosHashGetKey(p, &len);
SUserAuthInfo userAuth = {0};
stringToUserAuth(pKey, len, &userAuth);
taosArrayPush(*pUserAuth, &userAuth);
p = taosHashIterate(pUserAuthHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
if (NULL != pUdfHash) {
*pUdf = taosArrayInit(taosHashGetSize(pUdfHash), TSDB_FUNC_NAME_LEN);
if (NULL == *pUdf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* p = taosHashIterate(pUdfHash, NULL);
while (NULL != p) {
size_t len = 0;
char* pFunc = taosHashGetKey(p, &len);
char func[TSDB_FUNC_NAME_LEN] = {0};
strncpy(func, pFunc, len);
taosArrayPush(*pUdf, func);
p = taosHashIterate(pUdfHash, p);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
int32_t code = buildTableMetaReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@ -512,7 +564,13 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
code = buildTableVgroupReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildDbCfgReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbCfg);
code = buildDbCfgReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildUserAuthReq(pMetaCache->pUserAuth, &pCatalogReq->pUser);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildUdfReq(pMetaCache->pUdf, &pCatalogReq->pUdf);
}
return code;
}
@ -568,6 +626,31 @@ static int32_t putDbCfgToCache(const SArray* pDbCfgReq, const SArray* pDbCfgData
return TSDB_CODE_SUCCESS;
}
static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUserAuthData, SHashObj* pUserAuth) {
int32_t nvgs = taosArrayGetSize(pUserAuthReq);
for (int32_t i = 0; i < nvgs; ++i) {
SUserAuthInfo* pUser = taosArrayGet(pUserAuthReq, i);
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToStringExt(pUser->user, pUser->dbFName, pUser->type, key);
if (TSDB_CODE_SUCCESS != taosHashPut(pUserAuth, key, len, taosArrayGet(pUserAuthData, i), sizeof(bool))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHashObj* pUdf) {
int32_t num = taosArrayGetSize(pUdfReq);
for (int32_t i = 0; i < num; ++i) {
char* pFunc = taosArrayGet(pUdfReq, i);
SFuncInfo* pInfo = taosArrayGet(pUdfData, i);
if (TSDB_CODE_SUCCESS != taosHashPut(pUdf, pFunc, strlen(pFunc), &pInfo, POINTER_BYTES)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
int32_t code = putTableMetaToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@ -579,54 +662,161 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putDbCfgToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, pMetaCache->pDbCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = putUserAuthToCache(pCatalogReq->pUser, pMetaData->pUser, pMetaCache->pUserAuth);
}
if (TSDB_CODE_SUCCESS == code) {
code = putUdfToCache(pCatalogReq->pUdf, pMetaData->pUdfList, pMetaCache->pUdf);
}
return code;
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pTableMeta) {
pMetaCache->pTableMeta = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pTableMeta) {
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
if (NULL == *pTables) {
*pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
return taosHashPut(pMetaCache->pTableMeta, fullName, len, &len, POINTER_BYTES);
return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES);
}
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableMeta);
}
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
*pMeta = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
return NULL == *pMeta ? TSDB_CODE_PAR_INTERNAL_ERROR : TSDB_CODE_SUCCESS;
STableMeta** pRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
if (NULL == pRes || NULL == *pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pMeta = tableMetaDup(*pRes);
if (NULL == *pMeta) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
int32_t getDBVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) {
*pVgInfo = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName));
return NULL == *pVgInfo ? TSDB_CODE_PAR_INTERNAL_ERROR : TSDB_CODE_SUCCESS;
static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** pDbs) {
if (NULL == *pDbs) {
*pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == *pDbs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
return taosHashPut(*pDbs, fullName, len, &pDbs, POINTER_BYTES);
}
int32_t getTableHashVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
return reserveDbReqInCache(acctId, pDb, &pMetaCache->pDbVgroup);
}
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) {
SArray** pRes = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName));
if (NULL == pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
// *pRes is null, which is a legal value, indicating that the user DB has not been created
if (NULL != *pRes) {
*pVgInfo = taosArrayDup(*pRes);
if (NULL == *pVgInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableVgroup);
}
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
SVgroupInfo* pInfo = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName));
if (NULL == pInfo) {
SVgroupInfo** pRes = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName));
if (NULL == pRes || NULL == *pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pVgroup, pInfo, sizeof(SVgroupInfo));
memcpy(pVgroup, *pRes, sizeof(SVgroupInfo));
return TSDB_CODE_SUCCESS;
}
int32_t getDBVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
return reserveDbReqInCache(acctId, pDb, &pMetaCache->pDbCfg);
}
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
int32_t getDBCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) {
SDbCfgInfo* pDbCfg = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pDbCfg) {
SDbInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pRes || NULL == *pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, pDbCfg, sizeof(SDbCfgInfo));
*pVersion = (*pRes)->vgVer;
*pDbId = (*pRes)->dbId;
*pTableNum = (*pRes)->tbNum;
return TSDB_CODE_SUCCESS;
}
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
return reserveDbReqInCache(acctId, pDb, &pMetaCache->pDbCfg);
}
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) {
SDbCfgInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pRes || NULL == *pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, *pRes, sizeof(SDbCfgInfo));
return TSDB_CODE_SUCCESS;
}
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pUserAuth) {
pMetaCache->pUserAuth = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pUserAuth) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToString(acctId, pUser, pDb, type, key);
bool pass = false;
return taosHashPut(pMetaCache->pUserAuth, key, len, &pass, sizeof(pass));
}
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass) {
char key[USER_AUTH_KEY_MAX_LEN] = {0};
int32_t len = userAuthToStringExt(pUser, pDbFName, type, key);
bool* pRes = taosHashGet(pMetaCache->pUserAuth, key, len);
if (NULL == pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pPass = *pRes;
return TSDB_CODE_SUCCESS;
}
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
if (NULL == pMetaCache->pUdf) {
pMetaCache->pUdf = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == pMetaCache->pUdf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return taosHashPut(pMetaCache->pUdf, pFunc, strlen(pFunc), &pMetaCache, POINTER_BYTES);
}
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc));
if (NULL == pRes || NULL == *pRes) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, *pRes, sizeof(SFuncInfo));
return TSDB_CODE_SUCCESS;
}

View File

@ -59,6 +59,14 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
return code;
}
static int32_t syntaxParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = parse(pCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKey(pCxt, *pQuery);
}
return code;
}
static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
if (pParam->is_null && 1 == *(pParam->is_null)) {
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
@ -188,7 +196,7 @@ int32_t qSyntaxParseSql(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
// todo insert sql
} else {
code = parse(pCxt, pQuery);
code = syntaxParseSql(pCxt, pQuery);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq((*pQuery)->pMetaCache, pCatalogReq);

View File

@ -103,7 +103,7 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
}
{
ITableBuilder& builder = mcs->createTableBuilder("performance_schema", "streams", TSDB_SYSTEM_TABLE, 1)
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
builder.done();
}
}
@ -157,6 +157,12 @@ void generateTestST1(MockCatalogService* mcs) {
mcs->createSubTable("test", "st1", "st1s3", 1);
}
void generateFunctions(MockCatalogService* mcs) {
mcs->createFunction("udf1", TSDB_FUNC_TYPE_SCALAR, TSDB_DATA_TYPE_INT, tDataTypes[TSDB_DATA_TYPE_INT].bytes, 0);
mcs->createFunction("udf2", TSDB_FUNC_TYPE_AGGREGATE, TSDB_DATA_TYPE_DOUBLE, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes,
8);
}
} // namespace
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
@ -196,6 +202,11 @@ int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, con
return 0;
}
int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName,
SFuncInfo* pInfo) {
return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
}
void initMetaDataEnv() {
g_mockCatalogService.reset(new MockCatalogService());
@ -209,6 +220,7 @@ void initMetaDataEnv() {
stub.set(catalogGetDBVgInfo, __catalogGetDBVgInfo);
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
stub.set(catalogChkAuth, __catalogChkAuth);
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
// {
// AddrAny any("libcatalog.so");
// std::map<std::string,void*> result;
@ -256,6 +268,7 @@ void generateMetaData() {
generatePerformanceSchema(g_mockCatalogService.get());
generateTestT1(g_mockCatalogService.get());
generateTestST1(g_mockCatalogService.get());
generateFunctions(g_mockCatalogService.get());
g_mockCatalogService->showTables();
}

View File

@ -120,11 +120,35 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName);
if (udf_.end() == it) {
return TSDB_CODE_FAILED;
}
memcpy(pInfo, it->second.get(), sizeof(SFuncInfo));
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = getAllTableVgroup(pCatalogReq->pTableHash, &pMetaData->pTableHash);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllDbVgroup(pCatalogReq->pDbVgroup, &pMetaData->pDbVgroup);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllDbCfg(pCatalogReq->pDbCfg, &pMetaData->pDbCfg);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllDbInfo(pCatalogReq->pDbInfo, &pMetaData->pDbInfo);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllUserAuth(pCatalogReq->pUser, &pMetaData->pUser);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllUdf(pCatalogReq->pUdf, &pMetaData->pUdfList);
}
return code;
}
@ -211,21 +235,21 @@ class MockCatalogServiceImpl {
}
}
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
DbMetaCache::const_iterator it = meta_.find(db);
if (meta_.end() == it) {
return std::shared_ptr<MockTableMeta>();
}
TableMetaCache::const_iterator tit = it->second.find(tbname);
if (it->second.end() == tit) {
return std::shared_ptr<MockTableMeta>();
}
return tit->second;
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize) {
std::shared_ptr<SFuncInfo> info(new SFuncInfo);
strcpy(info->name, func.c_str());
info->funcType = funcType;
info->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
info->outputType = outputType;
info->outputLen = outputLen;
info->bufSize = bufSize;
udf_.insert(std::make_pair(func, info));
}
private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find(".");
@ -308,6 +332,18 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
DbMetaCache::const_iterator it = meta_.find(db);
if (meta_.end() == it) {
return std::shared_ptr<MockTableMeta>();
}
TableMetaCache::const_iterator tit = it->second.find(tbname);
if (it->second.end() == tit) {
return std::shared_ptr<MockTableMeta>();
}
return tit->second;
}
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pTableMetaReq) {
@ -330,12 +366,82 @@ class MockCatalogServiceImpl {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pTableVgroupReq) {
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
*pTableVgroupData = taosArrayInit(ntables, POINTER_BYTES);
*pTableVgroupData = taosArrayInit(ntables, sizeof(SVgroupInfo));
for (int32_t i = 0; i < ntables; ++i) {
SVgroupInfo* pVgInfo = (SVgroupInfo*)taosMemoryCalloc(1, sizeof(SVgroupInfo));
code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), pVgInfo);
SVgroupInfo vgInfo = {0};
code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), &vgInfo);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(*pTableVgroupData, &pVgInfo);
taosArrayPush(*pTableVgroupData, &vgInfo);
} else {
break;
}
}
}
return code;
}
int32_t getAllDbVgroup(SArray* pDbVgroupReq, SArray** pDbVgroupData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pDbVgroupReq) {
int32_t ndbs = taosArrayGetSize(pDbVgroupReq);
*pDbVgroupData = taosArrayInit(ndbs, POINTER_BYTES);
for (int32_t i = 0; i < ndbs; ++i) {
int64_t zeroVg = 0;
taosArrayPush(*pDbVgroupData, &zeroVg);
}
}
return code;
}
int32_t getAllDbCfg(SArray* pDbCfgReq, SArray** pDbCfgData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pDbCfgReq) {
int32_t ndbs = taosArrayGetSize(pDbCfgReq);
*pDbCfgData = taosArrayInit(ndbs, sizeof(SDbCfgInfo));
for (int32_t i = 0; i < ndbs; ++i) {
SDbCfgInfo dbCfg = {0};
taosArrayPush(*pDbCfgData, &dbCfg);
}
}
return code;
}
int32_t getAllDbInfo(SArray* pDbInfoReq, SArray** pDbInfoData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pDbInfoReq) {
int32_t ndbs = taosArrayGetSize(pDbInfoReq);
*pDbInfoData = taosArrayInit(ndbs, sizeof(SDbCfgInfo));
for (int32_t i = 0; i < ndbs; ++i) {
SDbInfo dbInfo = {0};
taosArrayPush(*pDbInfoData, &dbInfo);
}
}
return code;
}
int32_t getAllUserAuth(SArray* pUserAuthReq, SArray** pUserAuthData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pUserAuthReq) {
int32_t num = taosArrayGetSize(pUserAuthReq);
*pUserAuthData = taosArrayInit(num, sizeof(bool));
for (int32_t i = 0; i < num; ++i) {
bool pass = true;
taosArrayPush(*pUserAuthData, &pass);
}
}
return code;
}
int32_t getAllUdf(SArray* pUdfReq, SArray** pUdfData) const {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pUdfReq) {
int32_t num = taosArrayGetSize(pUdfReq);
*pUdfData = taosArrayInit(num, sizeof(SFuncInfo));
for (int32_t i = 0; i < num; ++i) {
SFuncInfo info = {0};
code = catalogGetUdfInfo((char*)taosArrayGet(pUdfReq, i), &info);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(*pUdfData, &info);
} else {
break;
}
@ -347,6 +453,7 @@ class MockCatalogServiceImpl {
uint64_t id_;
std::unique_ptr<TableBuilder> builder_;
DbMetaCache meta_;
UdfMetaCache udf_;
};
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
@ -365,9 +472,9 @@ void MockCatalogService::createSubTable(const std::string& db, const std::string
void MockCatalogService::showTables() const { impl_->showTables(); }
std::shared_ptr<MockTableMeta> MockCatalogService::getTableMeta(const std::string& db,
const std::string& tbname) const {
return impl_->getTableMeta(db, tbname);
void MockCatalogService::createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen,
int32_t bufSize) {
impl_->createFunction(func, funcType, outputType, outputLen, bufSize);
}
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
@ -382,6 +489,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo);
}
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}

View File

@ -56,11 +56,12 @@ class MockCatalogService {
int32_t numOfColumns, int32_t numOfTags = 0);
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const;
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const;
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
private:

View File

@ -228,7 +228,44 @@ TEST_F(ParserInitialCTest, createDnode) {
run("CREATE DNODE 1.1.1.1 PORT 9000");
}
// todo CREATE FUNCTION
// CREATE [AGGREGATE] FUNCTION [IF NOT EXISTS] func_name AS library_path OUTPUTTYPE type_name [BUFSIZE value]
TEST_F(ParserInitialCTest, createFunction) {
useDb("root", "test");
SCreateFuncReq expect = {0};
auto setCreateFuncReqFunc = [&](const char* pUdfName, int8_t outputType, int32_t outputBytes = 0,
int8_t funcType = TSDB_FUNC_TYPE_SCALAR, int8_t igExists = 0, int32_t bufSize = 0) {
memset(&expect, 0, sizeof(SCreateFuncReq));
strcpy(expect.name, pUdfName);
expect.igExists = igExists;
expect.funcType = funcType;
expect.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
expect.outputType = outputType;
expect.outputLen = outputBytes > 0 ? outputBytes : tDataTypes[outputType].bytes;
expect.bufSize = bufSize;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_FUNCTION_STMT);
SCreateFuncReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSCreateFuncReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(req.funcType, expect.funcType);
ASSERT_EQ(req.scriptType, expect.scriptType);
ASSERT_EQ(req.outputType, expect.outputType);
ASSERT_EQ(req.outputLen, expect.outputLen);
ASSERT_EQ(req.bufSize, expect.bufSize);
});
setCreateFuncReqFunc("udf1", TSDB_DATA_TYPE_INT);
// run("CREATE FUNCTION udf1 AS './build/lib/libudf1.so' OUTPUTTYPE INT");
setCreateFuncReqFunc("udf2", TSDB_DATA_TYPE_DOUBLE, 0, TSDB_FUNC_TYPE_AGGREGATE, 1, 8);
// run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8");
}
TEST_F(ParserInitialCTest, createIndexSma) {
useDb("root", "test");

View File

@ -103,6 +103,7 @@ TEST_F(ParserInitialDTest, dropTopic) {
}
TEST_F(ParserInitialDTest, dropUser) {
login("root");
useDb("root", "test");
run("drop user wxy");

View File

@ -141,6 +141,14 @@ TEST_F(ParserSelectTest, IndefiniteRowsFuncSemanticCheck) {
// run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
}
TEST_F(ParserSelectTest, useDefinedFunc) {
useDb("root", "test");
run("SELECT udf1(c1) FROM t1");
run("SELECT udf2(c1) FROM t1 GROUP BY c2");
}
TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");

View File

@ -37,6 +37,7 @@ class ParserEnv : public testing::Environment {
virtual void SetUp() {
initMetaDataEnv();
generateMetaData();
initLog(TD_TMP_DIR_PATH "td");
}
virtual void TearDown() {
@ -47,20 +48,55 @@ class ParserEnv : public testing::Environment {
ParserEnv() {}
virtual ~ParserEnv() {}
private:
void initLog(const char* path) {
int32_t logLevel = getLogLevel();
dDebugFlag = logLevel;
vDebugFlag = logLevel;
mDebugFlag = logLevel;
cDebugFlag = logLevel;
jniDebugFlag = logLevel;
tmrDebugFlag = logLevel;
uDebugFlag = logLevel;
rpcDebugFlag = logLevel;
qDebugFlag = logLevel;
wDebugFlag = logLevel;
sDebugFlag = logLevel;
tsdbDebugFlag = logLevel;
tsLogEmbedded = 1;
tsAsyncLog = 0;
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
if (taosInitLog("taoslog", 1) != 0) {
std::cout << "failed to init log file" << std::endl;
}
}
};
static void parseArg(int argc, char* argv[]) {
int opt = 0;
const char* optstring = "";
int opt = 0;
const char* optstring = "";
// clang-format off
static struct option long_options[] = {
{"dump", no_argument, NULL, 'd'}, {"async", no_argument, NULL, 'a'}, {0, 0, 0, 0}};
{"dump", no_argument, NULL, 'd'},
{"async", required_argument, NULL, 'a'},
{"skipSql", required_argument, NULL, 's'},
{0, 0, 0, 0}
};
// clang-format on
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
switch (opt) {
case 'd':
g_dump = true;
break;
case 'a':
g_testAsyncApis = true;
setAsyncFlag(optarg);
break;
case 's':
setSkipSqlNum(optarg);
break;
default:
break;

View File

@ -44,23 +44,40 @@ namespace ParserTest {
} \
} while (0);
bool g_dump = false;
bool g_testAsyncApis = false;
bool g_dump = false;
bool g_testAsyncApis = true;
int32_t g_logLevel = 131;
int32_t g_skipSql = 0;
void setAsyncFlag(const char* pFlag) { g_testAsyncApis = stoi(pFlag) > 0 ? true : false; }
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(optarg); }
struct TerminateFlag : public exception {
const char* what() const throw() { return "success and terminate"; }
};
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); }
int32_t getLogLevel() { return g_logLevel; }
class ParserTestBaseImpl {
public:
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase) {}
void login(const std::string& user) { caseEnv_.user_ = user; }
void useDb(const string& acctId, const string& db) {
caseEnv_.acctId_ = acctId;
caseEnv_.db_ = db;
caseEnv_.nsql_ = g_skipSql;
}
void run(const string& sql, int32_t expect, ParserStage checkStage) {
if (caseEnv_.nsql_ > 0) {
--(caseEnv_.nsql_);
return;
}
reset(expect, checkStage);
try {
SParseContext cxt = {0};
@ -69,6 +86,8 @@ class ParserTestBaseImpl {
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
doAuthenticate(&cxt, pQuery);
doTranslate(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
@ -89,59 +108,14 @@ class ParserTestBaseImpl {
}
}
void runAsync(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage);
try {
SParseContext cxt = {0};
setParseContext(sql, &cxt, true);
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
SCatalogReq catalogReq = {0};
doBuildCatalogReq(pQuery->pMetaCache, &catalogReq);
string err;
thread t1([&]() {
try {
SMetaData metaData = {0};
doGetAllMeta(&catalogReq, &metaData);
doPutMetaDataToCache(&catalogReq, &metaData, pQuery->pMetaCache);
doTranslate(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
}
});
t1.join();
if (!err.empty()) {
throw runtime_error(err);
}
if (g_dump) {
dump();
}
} catch (const TerminateFlag& e) {
// success and terminate
return;
} catch (...) {
dump();
throw;
}
}
private:
struct caseEnv {
string acctId_;
string db_;
string acctId_;
string user_;
string db_;
int32_t nsql_;
caseEnv() : user_("wangxiaoyu"), nsql_(0) {}
};
struct stmtEnv {
@ -207,6 +181,8 @@ class ParserTestBaseImpl {
pCxt->acctId = atoi(caseEnv_.acctId_.c_str());
pCxt->db = caseEnv_.db_.c_str();
pCxt->pUser = caseEnv_.user_.c_str();
pCxt->isSuperUser = caseEnv_.user_ == "root";
pCxt->pSql = stmtEnv_.sql_.c_str();
pCxt->sqlLen = stmtEnv_.sql_.length();
pCxt->pMsg = stmtEnv_.msgBuf_.data();
@ -220,6 +196,11 @@ class ParserTestBaseImpl {
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
void doCollectMetaKey(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(collectMetaKey, pCxt, pQuery);
ASSERT_NE(pQuery->pMetaCache, nullptr);
}
void doBuildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
DO_WITH_THROW(buildCatalogReq, pMetaCache, pCatalogReq);
}
@ -232,6 +213,8 @@ class ParserTestBaseImpl {
DO_WITH_THROW(putMetaDataToCache, pCatalogReq, pMetaData, pMetaCache);
}
void doAuthenticate(SParseContext* pCxt, SQuery* pQuery) { DO_WITH_THROW(authenticate, pCxt, pQuery); }
void doTranslate(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(translate, pCxt, pQuery);
checkQuery(pQuery, PARSER_STAGE_TRANSLATE);
@ -254,6 +237,59 @@ class ParserTestBaseImpl {
void checkQuery(const SQuery* pQuery, ParserStage stage) { pBase_->checkDdl(pQuery, stage); }
void runAsync(const string& sql, int32_t expect, ParserStage checkStage) {
reset(expect, checkStage);
try {
SParseContext cxt = {0};
setParseContext(sql, &cxt, true);
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
doCollectMetaKey(&cxt, pQuery);
SCatalogReq catalogReq = {0};
doBuildCatalogReq(pQuery->pMetaCache, &catalogReq);
string err;
thread t1([&]() {
try {
SMetaData metaData = {0};
doGetAllMeta(&catalogReq, &metaData);
doPutMetaDataToCache(&catalogReq, &metaData, pQuery->pMetaCache);
doAuthenticate(&cxt, pQuery);
doTranslate(&cxt, pQuery);
doCalculateConstant(&cxt, pQuery);
} catch (const TerminateFlag& e) {
// success and terminate
} catch (const runtime_error& e) {
err = e.what();
} catch (...) {
err = "unknown error";
}
});
t1.join();
if (!err.empty()) {
throw runtime_error(err);
}
if (g_dump) {
dump();
}
} catch (const TerminateFlag& e) {
// success and terminate
return;
} catch (...) {
dump();
throw;
}
}
caseEnv caseEnv_;
stmtEnv stmtEnv_;
stmtRes res_;
@ -264,16 +300,14 @@ ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {}
ParserTestBase::~ParserTestBase() {}
void ParserTestBase::login(const std::string& user) { return impl_->login(user); }
void ParserTestBase::useDb(const std::string& acctId, const std::string& db) { impl_->useDb(acctId, db); }
void ParserTestBase::run(const std::string& sql, int32_t expect, ParserStage checkStage) {
return impl_->run(sql, expect, checkStage);
}
void ParserTestBase::runAsync(const std::string& sql, int32_t expect, ParserStage checkStage) {
return impl_->runAsync(sql, expect, checkStage);
}
void ParserTestBase::checkDdl(const SQuery* pQuery, ParserStage stage) { return; }
} // namespace ParserTest

View File

@ -34,9 +34,9 @@ class ParserTestBase : public testing::Test {
ParserTestBase();
virtual ~ParserTestBase();
void login(const std::string& user);
void useDb(const std::string& acctId, const std::string& db);
void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
void runAsync(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
virtual void checkDdl(const SQuery* pQuery, ParserStage stage);
@ -65,7 +65,11 @@ class ParserDdlTest : public ParserTestBase {
};
extern bool g_dump;
extern bool g_testAsyncApis;
extern void setAsyncFlag(const char* pFlag);
extern void setLogLevel(const char* pLogLevel);
extern int32_t getLogLevel();
extern void setSkipSqlNum(const char* pNum);
} // namespace ParserTest