feat: parser adapts asynchronous interface
This commit is contained in:
parent
768beb6482
commit
4432fa930f
|
@ -23,6 +23,9 @@ extern "C" {
|
|||
#include "function.h"
|
||||
#include "querynodes.h"
|
||||
|
||||
#define FUNC_AGGREGATE_UDF_ID 5001
|
||||
#define FUNC_SCALAR_UDF_ID 5002
|
||||
|
||||
typedef enum EFunctionType {
|
||||
// aggregate function
|
||||
FUNCTION_TYPE_APERCENTILE = 1,
|
||||
|
@ -126,21 +129,12 @@ typedef enum EFunctionType {
|
|||
struct SqlFunctionCtx;
|
||||
struct SResultRowEntryInfo;
|
||||
struct STimeWindow;
|
||||
struct SCatalog;
|
||||
|
||||
typedef struct SFmGetFuncInfoParam {
|
||||
struct SCatalog* pCtg;
|
||||
void* pRpc;
|
||||
const SEpSet* pMgmtEps;
|
||||
char* pErrBuf;
|
||||
int32_t errBufLen;
|
||||
} SFmGetFuncInfoParam;
|
||||
|
||||
int32_t fmFuncMgtInit();
|
||||
|
||||
void fmFuncMgtDestroy();
|
||||
|
||||
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
|
||||
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen);
|
||||
|
||||
bool fmIsBuiltinFunc(const char* pFunc);
|
||||
|
||||
|
|
|
@ -655,7 +655,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
|
||||
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
|
||||
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
|
||||
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
|
||||
#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
|
||||
|
||||
//udf
|
||||
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
|
||||
|
|
|
@ -14,7 +14,7 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
function
|
||||
PRIVATE os util common nodes scalar catalog qcom transport
|
||||
PRIVATE os util common nodes scalar qcom transport
|
||||
PUBLIC uv_a
|
||||
)
|
||||
|
||||
|
|
|
@ -45,8 +45,6 @@ extern "C" {
|
|||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
#define FUNC_UDF_ID_START 5000
|
||||
#define FUNC_AGGREGATE_UDF_ID 5001
|
||||
#define FUNC_SCALAR_UDF_ID 5002
|
||||
|
||||
extern const int funcMgtUdfNum;
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#include "functionMgt.h"
|
||||
|
||||
#include "builtins.h"
|
||||
#include "catalog.h"
|
||||
#include "functionMgtInt.h"
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
|
@ -65,35 +64,19 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
|||
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
|
||||
}
|
||||
|
||||
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
|
||||
SFuncInfo funcInfo = {0};
|
||||
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &funcInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pFunc->funcType = FUNCTION_TYPE_UDF;
|
||||
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
|
||||
pFunc->node.resType.type = funcInfo.outputType;
|
||||
pFunc->node.resType.bytes = funcInfo.outputLen;
|
||||
pFunc->udfBufSize = funcInfo.bufSize;
|
||||
tFreeSFuncInfo(&funcInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fmFuncMgtInit() {
|
||||
taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
|
||||
return initFunctionCode;
|
||||
}
|
||||
|
||||
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
|
||||
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
|
||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
|
||||
if (NULL != pVal) {
|
||||
pFunc->funcId = *(int32_t*)pVal;
|
||||
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
|
||||
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pParam->pErrBuf, pParam->errBufLen);
|
||||
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
|
||||
}
|
||||
return getUdfInfo(pParam, pFunc);
|
||||
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
|
||||
}
|
||||
|
||||
bool fmIsBuiltinFunc(const char* pFunc) {
|
||||
|
|
|
@ -45,6 +45,7 @@ typedef struct SParseMetaCache {
|
|||
SHashObj* pDbCfg; // key is tbFName, element is SDbCfgInfo*
|
||||
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
|
||||
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
|
||||
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
|
||||
} SParseMetaCache;
|
||||
|
||||
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
|
||||
|
@ -70,6 +71,7 @@ int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCac
|
|||
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
|
||||
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type,
|
||||
SParseMetaCache* pMetaCache);
|
||||
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
|
||||
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
|
||||
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
|
||||
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
|
||||
|
@ -78,6 +80,7 @@ int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFNam
|
|||
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
|
||||
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDb, AUTH_TYPE type,
|
||||
bool* pPass);
|
||||
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "parAst.h"
|
||||
#include "parInt.h"
|
||||
|
@ -105,6 +106,13 @@ typedef struct SCollectMetaKeyFromExprCxt {
|
|||
|
||||
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt);
|
||||
|
||||
static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFunctionNode* pFunc) {
|
||||
if (fmIsBuiltinFunc(pFunc->functionName)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
|
||||
}
|
||||
|
||||
static EDealRes collectMetaKeyFromRealTable(SCollectMetaKeyFromExprCxt* pCxt, SRealTableNode* pRealTable) {
|
||||
pCxt->errCode = reserveTableMetaInCache(pCxt->pComCxt->pParseCxt->acctId, pRealTable->table.dbName,
|
||||
pRealTable->table.tableName, pCxt->pComCxt->pMetaCache);
|
||||
|
@ -128,7 +136,7 @@ static EDealRes collectMetaKeyFromExprImpl(SNode* pNode, void* pContext) {
|
|||
SCollectMetaKeyFromExprCxt* pCxt = pContext;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_FUNCTION:
|
||||
break;
|
||||
return collectMetaKeyFromFunction(pCxt, (SFunctionNode*)pNode);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return collectMetaKeyFromRealTable(pCxt, (SRealTableNode*)pNode);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
|
|
|
@ -239,6 +239,27 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
SFuncInfo funcInfo = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getUdfInfoFromCache(pCxt->pMetaCache, pFunc->functionName, &funcInfo);
|
||||
} else {
|
||||
code = catalogGetUdfInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pFunc->functionName,
|
||||
&funcInfo);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pFunc->funcType = FUNCTION_TYPE_UDF;
|
||||
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == funcInfo.funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
|
||||
pFunc->node.resType.type = funcInfo.outputType;
|
||||
pFunc->node.resType.bytes = funcInfo.outputLen;
|
||||
pFunc->udfBufSize = funcInfo.bufSize;
|
||||
tFreeSFuncInfo(&funcInfo);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
|
||||
pCxt->pParseCxt = pParseCxt;
|
||||
pCxt->errCode = TSDB_CODE_SUCCESS;
|
||||
|
@ -873,12 +894,11 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) {
|
|||
}
|
||||
|
||||
static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
||||
.pRpc = pCxt->pParseCxt->pTransporter,
|
||||
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
|
||||
.pErrBuf = pCxt->msgBuf.buf,
|
||||
.errBufLen = pCxt->msgBuf.len};
|
||||
return fmGetFuncInfo(¶m, pFunc);
|
||||
int32_t code = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
|
||||
if (TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION == code) {
|
||||
code = getUdfInfo(pCxt, pFunc);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
#include "cJSON.h"
|
||||
#include "querynodes.h"
|
||||
|
||||
#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
|
||||
|
||||
static char* getSyntaxErrFormat(int32_t errCode) {
|
||||
switch (errCode) {
|
||||
case TSDB_CODE_PAR_SYNTAX_ERROR:
|
||||
|
@ -441,8 +443,6 @@ end:
|
|||
return retCode;
|
||||
}
|
||||
|
||||
#define USER_AUTH_KEY_MAX_LEN TSDB_USER_LEN + TSDB_DB_FNAME_LEN + 2
|
||||
|
||||
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) {
|
||||
return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type);
|
||||
}
|
||||
|
@ -536,6 +536,25 @@ static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
|
||||
if (NULL != pUdfHash) {
|
||||
*pUdf = taosArrayInit(taosHashGetSize(pUdfHash), TSDB_FUNC_NAME_LEN);
|
||||
if (NULL == *pUdf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void* p = taosHashIterate(pUdfHash, NULL);
|
||||
while (NULL != p) {
|
||||
size_t len = 0;
|
||||
char* pFunc = taosHashGetKey(p, &len);
|
||||
char func[TSDB_FUNC_NAME_LEN] = {0};
|
||||
strncpy(func, pFunc, len);
|
||||
taosArrayPush(*pUdf, func);
|
||||
p = taosHashIterate(pUdfHash, p);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
int32_t code = buildTableMetaReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -550,6 +569,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildUserAuthReq(pMetaCache->pUserAuth, &pCatalogReq->pUser);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildUdfReq(pMetaCache->pUdf, &pCatalogReq->pUdf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -617,6 +639,18 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHashObj* pUdf) {
|
||||
int32_t num = taosArrayGetSize(pUdfReq);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char* pFunc = taosArrayGet(pUdfReq, i);
|
||||
SFuncInfo* pInfo = taosArrayGet(pUdfData, i);
|
||||
if (TSDB_CODE_SUCCESS != taosHashPut(pUdf, pFunc, strlen(pFunc), &pInfo, POINTER_BYTES)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t code = putTableMetaToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -631,6 +665,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putUserAuthToCache(pCatalogReq->pUser, pMetaData->pUser, pMetaCache->pUserAuth);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putUdfToCache(pCatalogReq->pUdf, pMetaData->pUdfList, pMetaCache->pUdf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -643,7 +680,7 @@ static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const cha
|
|||
}
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
|
||||
return taosHashPut(*pTables, fullName, len, &len, POINTER_BYTES);
|
||||
return taosHashPut(*pTables, fullName, len, &pTables, POINTER_BYTES);
|
||||
}
|
||||
|
||||
int32_t reserveTableMetaInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
||||
|
@ -673,7 +710,7 @@ static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** p
|
|||
}
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s", acctId, pDb);
|
||||
return taosHashPut(*pDbs, fullName, len, &len, POINTER_BYTES);
|
||||
return taosHashPut(*pDbs, fullName, len, &pDbs, POINTER_BYTES);
|
||||
}
|
||||
|
||||
int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
|
||||
|
@ -764,3 +801,22 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
|
|||
*pPass = *pRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
|
||||
if (NULL == pMetaCache->pUdf) {
|
||||
pMetaCache->pUdf = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (NULL == pMetaCache->pUdf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return taosHashPut(pMetaCache->pUdf, pFunc, strlen(pFunc), &pMetaCache, POINTER_BYTES);
|
||||
}
|
||||
|
||||
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
|
||||
SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
memcpy(pInfo, *pRes, sizeof(SFuncInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -157,6 +157,12 @@ void generateTestST1(MockCatalogService* mcs) {
|
|||
mcs->createSubTable("test", "st1", "st1s3", 1);
|
||||
}
|
||||
|
||||
void generateFunctions(MockCatalogService* mcs) {
|
||||
mcs->createFunction("udf1", TSDB_FUNC_TYPE_SCALAR, TSDB_DATA_TYPE_INT, tDataTypes[TSDB_DATA_TYPE_INT].bytes, 0);
|
||||
mcs->createFunction("udf2", TSDB_FUNC_TYPE_AGGREGATE, TSDB_DATA_TYPE_DOUBLE, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes,
|
||||
8);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
|
||||
|
@ -196,6 +202,11 @@ int32_t __catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, con
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t __catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName,
|
||||
SFuncInfo* pInfo) {
|
||||
return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo);
|
||||
}
|
||||
|
||||
void initMetaDataEnv() {
|
||||
g_mockCatalogService.reset(new MockCatalogService());
|
||||
|
||||
|
@ -209,6 +220,7 @@ void initMetaDataEnv() {
|
|||
stub.set(catalogGetDBVgInfo, __catalogGetDBVgInfo);
|
||||
stub.set(catalogGetDBCfg, __catalogGetDBCfg);
|
||||
stub.set(catalogChkAuth, __catalogChkAuth);
|
||||
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
|
||||
// {
|
||||
// AddrAny any("libcatalog.so");
|
||||
// std::map<std::string,void*> result;
|
||||
|
@ -256,6 +268,7 @@ void generateMetaData() {
|
|||
generatePerformanceSchema(g_mockCatalogService.get());
|
||||
generateTestT1(g_mockCatalogService.get());
|
||||
generateTestST1(g_mockCatalogService.get());
|
||||
generateFunctions(g_mockCatalogService.get());
|
||||
g_mockCatalogService->showTables();
|
||||
}
|
||||
|
||||
|
|
|
@ -120,6 +120,15 @@ class MockCatalogServiceImpl {
|
|||
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
|
||||
}
|
||||
|
||||
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
|
||||
auto it = udf_.find(funcName);
|
||||
if (udf_.end() == it) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
memcpy(pInfo, it->second.get(), sizeof(SFuncInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
|
||||
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -137,6 +146,9 @@ class MockCatalogServiceImpl {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getAllUserAuth(pCatalogReq->pUser, &pMetaData->pUser);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getAllUdf(pCatalogReq->pUdf, &pMetaData->pUdfList);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -223,21 +235,21 @@ class MockCatalogServiceImpl {
|
|||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
|
||||
DbMetaCache::const_iterator it = meta_.find(db);
|
||||
if (meta_.end() == it) {
|
||||
return std::shared_ptr<MockTableMeta>();
|
||||
}
|
||||
TableMetaCache::const_iterator tit = it->second.find(tbname);
|
||||
if (it->second.end() == tit) {
|
||||
return std::shared_ptr<MockTableMeta>();
|
||||
}
|
||||
return tit->second;
|
||||
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize) {
|
||||
std::shared_ptr<SFuncInfo> info(new SFuncInfo);
|
||||
strcpy(info->name, func.c_str());
|
||||
info->funcType = funcType;
|
||||
info->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
|
||||
info->outputType = outputType;
|
||||
info->outputLen = outputLen;
|
||||
info->bufSize = bufSize;
|
||||
udf_.insert(std::make_pair(func, info));
|
||||
}
|
||||
|
||||
private:
|
||||
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
|
||||
typedef std::map<std::string, TableMetaCache> DbMetaCache;
|
||||
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
|
||||
|
||||
std::string toDbname(const std::string& dbFullName) const {
|
||||
std::string::size_type n = dbFullName.find(".");
|
||||
|
@ -320,6 +332,18 @@ class MockCatalogServiceImpl {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const {
|
||||
DbMetaCache::const_iterator it = meta_.find(db);
|
||||
if (meta_.end() == it) {
|
||||
return std::shared_ptr<MockTableMeta>();
|
||||
}
|
||||
TableMetaCache::const_iterator tit = it->second.find(tbname);
|
||||
if (it->second.end() == tit) {
|
||||
return std::shared_ptr<MockTableMeta>();
|
||||
}
|
||||
return tit->second;
|
||||
}
|
||||
|
||||
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pTableMetaReq) {
|
||||
|
@ -408,9 +432,28 @@ class MockCatalogServiceImpl {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getAllUdf(SArray* pUdfReq, SArray** pUdfData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pUdfReq) {
|
||||
int32_t num = taosArrayGetSize(pUdfReq);
|
||||
*pUdfData = taosArrayInit(num, sizeof(SFuncInfo));
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SFuncInfo info = {0};
|
||||
code = catalogGetUdfInfo((char*)taosArrayGet(pUdfReq, i), &info);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayPush(*pUdfData, &info);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
uint64_t id_;
|
||||
std::unique_ptr<TableBuilder> builder_;
|
||||
DbMetaCache meta_;
|
||||
UdfMetaCache udf_;
|
||||
};
|
||||
|
||||
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
|
||||
|
@ -429,9 +472,9 @@ void MockCatalogService::createSubTable(const std::string& db, const std::string
|
|||
|
||||
void MockCatalogService::showTables() const { impl_->showTables(); }
|
||||
|
||||
std::shared_ptr<MockTableMeta> MockCatalogService::getTableMeta(const std::string& db,
|
||||
const std::string& tbname) const {
|
||||
return impl_->getTableMeta(db, tbname);
|
||||
void MockCatalogService::createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen,
|
||||
int32_t bufSize) {
|
||||
impl_->createFunction(func, funcType, outputType, outputLen, bufSize);
|
||||
}
|
||||
|
||||
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
|
||||
|
@ -446,6 +489,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
|
|||
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
|
||||
}
|
||||
|
||||
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
|
||||
return impl_->catalogGetUdfInfo(funcName, pInfo);
|
||||
}
|
||||
|
||||
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
|
||||
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
|
||||
}
|
||||
|
|
|
@ -56,11 +56,12 @@ class MockCatalogService {
|
|||
int32_t numOfColumns, int32_t numOfTags = 0);
|
||||
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
|
||||
void showTables() const;
|
||||
std::shared_ptr<MockTableMeta> getTableMeta(const std::string& db, const std::string& tbname) const;
|
||||
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
|
||||
|
||||
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
|
||||
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
|
||||
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
|
||||
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
|
||||
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
|
||||
|
||||
private:
|
||||
|
|
|
@ -228,7 +228,44 @@ TEST_F(ParserInitialCTest, createDnode) {
|
|||
run("CREATE DNODE 1.1.1.1 PORT 9000");
|
||||
}
|
||||
|
||||
// todo CREATE FUNCTION
|
||||
// CREATE [AGGREGATE] FUNCTION [IF NOT EXISTS] func_name AS library_path OUTPUTTYPE type_name [BUFSIZE value]
|
||||
TEST_F(ParserInitialCTest, createFunction) {
|
||||
useDb("root", "test");
|
||||
|
||||
SCreateFuncReq expect = {0};
|
||||
|
||||
auto setCreateFuncReqFunc = [&](const char* pUdfName, int8_t outputType, int32_t outputBytes = 0,
|
||||
int8_t funcType = TSDB_FUNC_TYPE_SCALAR, int8_t igExists = 0, int32_t bufSize = 0) {
|
||||
memset(&expect, 0, sizeof(SCreateFuncReq));
|
||||
strcpy(expect.name, pUdfName);
|
||||
expect.igExists = igExists;
|
||||
expect.funcType = funcType;
|
||||
expect.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
|
||||
expect.outputType = outputType;
|
||||
expect.outputLen = outputBytes > 0 ? outputBytes : tDataTypes[outputType].bytes;
|
||||
expect.bufSize = bufSize;
|
||||
};
|
||||
|
||||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_FUNCTION_STMT);
|
||||
SCreateFuncReq req = {0};
|
||||
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSCreateFuncReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||
|
||||
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||
ASSERT_EQ(req.igExists, expect.igExists);
|
||||
ASSERT_EQ(req.funcType, expect.funcType);
|
||||
ASSERT_EQ(req.scriptType, expect.scriptType);
|
||||
ASSERT_EQ(req.outputType, expect.outputType);
|
||||
ASSERT_EQ(req.outputLen, expect.outputLen);
|
||||
ASSERT_EQ(req.bufSize, expect.bufSize);
|
||||
});
|
||||
|
||||
setCreateFuncReqFunc("udf1", TSDB_DATA_TYPE_INT);
|
||||
run("CREATE FUNCTION udf1 AS './build/lib/libudf1.so' OUTPUTTYPE INT");
|
||||
|
||||
setCreateFuncReqFunc("udf2", TSDB_DATA_TYPE_DOUBLE, 0, TSDB_FUNC_TYPE_AGGREGATE, 1, 8);
|
||||
run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8");
|
||||
}
|
||||
|
||||
TEST_F(ParserInitialCTest, createIndexSma) {
|
||||
useDb("root", "test");
|
||||
|
|
|
@ -141,6 +141,14 @@ TEST_F(ParserSelectTest, IndefiniteRowsFuncSemanticCheck) {
|
|||
// run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, useDefinedFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT udf1(c1) FROM t1");
|
||||
|
||||
run("SELECT udf2(c1) FROM t1 GROUP BY c2");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, groupBy) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
Loading…
Reference in New Issue