Merge pull request #11713 from taosdata/feature/qnode
feat(query): qnode for query only
This commit is contained in:
commit
44d78ab900
|
@ -146,6 +146,7 @@ DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
|
|||
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
|
||||
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
|
||||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
||||
|
||||
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
||||
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
|
||||
|
|
|
@ -682,6 +682,7 @@ int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
|
|||
|
||||
typedef struct {
|
||||
int32_t numOfFuncs;
|
||||
bool ignoreCodeComment;
|
||||
SArray* pFuncNames;
|
||||
} SRetrieveFuncReq;
|
||||
|
||||
|
@ -710,6 +711,7 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||
void tFreeSFuncInfo(SFuncInfo *pInfo);
|
||||
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -224,6 +224,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
|||
|
||||
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo);
|
||||
|
||||
|
||||
/**
|
||||
* Destroy catalog and relase all resources
|
||||
|
|
|
@ -119,12 +119,19 @@ typedef enum EFunctionType {
|
|||
struct SqlFunctionCtx;
|
||||
struct SResultRowEntryInfo;
|
||||
struct STimeWindow;
|
||||
struct SCatalog;
|
||||
|
||||
typedef struct SFmGetFuncInfoParam {
|
||||
struct SCatalog* pCtg;
|
||||
void *pRpc;
|
||||
const SEpSet* pMgmtEps;
|
||||
} SFmGetFuncInfoParam;
|
||||
|
||||
int32_t fmFuncMgtInit();
|
||||
|
||||
void fmFuncMgtDestroy();
|
||||
|
||||
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
|
||||
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
|
||||
|
||||
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ typedef struct SStmtBindInfo {
|
|||
} SStmtBindInfo;
|
||||
|
||||
typedef struct SStmtExecInfo {
|
||||
int32_t affectedRows;
|
||||
SRequestObj* pRequest;
|
||||
SHashObj* pVgHash;
|
||||
SHashObj* pBlockHash;
|
||||
|
|
|
@ -622,6 +622,10 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
|
|||
return stmtSetTbName(stmt, name);
|
||||
}
|
||||
|
||||
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) {
|
||||
return taos_stmt_set_tbname(stmt, name);
|
||||
}
|
||||
|
||||
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) {
|
||||
if (stmt == NULL || bind == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
|
|
|
@ -486,7 +486,8 @@ int stmtExec(TAOS_STMT *stmt) {
|
|||
|
||||
STMT_ERR_JRET(pStmt->exec.pRequest->code);
|
||||
|
||||
pStmt->affectedRows += taos_affected_rows(pStmt->exec.pRequest);
|
||||
pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
|
||||
pStmt->affectedRows += pStmt->exec.affectedRows;
|
||||
|
||||
_return:
|
||||
|
||||
|
|
|
@ -1624,6 +1624,7 @@ int32_t tSerializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq *
|
|||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->ignoreCodeComment) < 0) return -1;
|
||||
|
||||
if (pReq->numOfFuncs != (int32_t)taosArrayGetSize(pReq->pFuncNames)) return -1;
|
||||
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
|
||||
|
@ -1644,6 +1645,7 @@ int32_t tDeserializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq
|
|||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, (int8_t *)&pReq->ignoreCodeComment) < 0) return -1;
|
||||
|
||||
pReq->pFuncNames = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN);
|
||||
if (pReq->pFuncNames == NULL) return -1;
|
||||
|
@ -1681,8 +1683,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
|
|||
if (tEncodeI64(&encoder, pInfo->signature) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1;
|
||||
if (pInfo->codeSize) {
|
||||
if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1;
|
||||
}
|
||||
if (pInfo->commentSize) {
|
||||
if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
@ -1713,15 +1719,23 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
|
|||
if (tDecodeI64(&decoder, &fInfo.signature) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &fInfo.codeSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &fInfo.commentSize) < 0) return -1;
|
||||
fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize);
|
||||
fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize);
|
||||
if (fInfo.pCode == NULL || fInfo.pComment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
if (fInfo.codeSize) {
|
||||
fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize);
|
||||
if (fInfo.pCode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1;
|
||||
}
|
||||
if (fInfo.commentSize) {
|
||||
fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize);
|
||||
if (fInfo.pComment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1;
|
||||
taosArrayPush(pRsp->pFuncInfos, &fInfo);
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
@ -1730,12 +1744,20 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSFuncInfo(SFuncInfo *pInfo) {
|
||||
if (NULL == pInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFree(pInfo->pCode);
|
||||
taosMemoryFree(pInfo->pComment);
|
||||
}
|
||||
|
||||
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) {
|
||||
int32_t size = taosArrayGetSize(pRsp->pFuncInfos);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SFuncInfo *pInfo = taosArrayGet(pRsp->pFuncInfos, i);
|
||||
taosMemoryFree(pInfo->pCode);
|
||||
taosMemoryFree(pInfo->pComment);
|
||||
tFreeSFuncInfo(pInfo);
|
||||
}
|
||||
taosArrayDestroy(pRsp->pFuncInfos);
|
||||
}
|
||||
|
|
|
@ -427,7 +427,6 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
|
|||
|
||||
SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
|
||||
if (pFunc == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_FUNC;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
|
||||
|
@ -439,21 +438,26 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
|
|||
funcInfo.outputLen = pFunc->outputLen;
|
||||
funcInfo.bufSize = pFunc->bufSize;
|
||||
funcInfo.signature = pFunc->signature;
|
||||
funcInfo.commentSize = pFunc->commentSize;
|
||||
funcInfo.codeSize = pFunc->codeSize;
|
||||
funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
|
||||
if (funcInfo.pCode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
|
||||
if (funcInfo.commentSize > 0) {
|
||||
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
|
||||
if (funcInfo.pComment == NULL) {
|
||||
if (retrieveReq.ignoreCodeComment) {
|
||||
funcInfo.commentSize = 0;
|
||||
funcInfo.codeSize = 0;
|
||||
} else {
|
||||
funcInfo.commentSize = pFunc->commentSize;
|
||||
funcInfo.codeSize = pFunc->codeSize;
|
||||
funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
|
||||
if (funcInfo.pCode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
||||
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
|
||||
if (funcInfo.commentSize > 0) {
|
||||
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
|
||||
if (funcInfo.pComment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto RETRIEVE_FUNC_OVER;
|
||||
}
|
||||
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
||||
}
|
||||
}
|
||||
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
|
||||
mndReleaseFunc(pMnode, pFunc);
|
||||
|
@ -518,11 +522,16 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)b1, false);
|
||||
|
||||
char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]);
|
||||
if (pFunc->pComment) {
|
||||
char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
|
||||
} else {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
}
|
||||
|
||||
int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0;
|
||||
|
||||
|
@ -556,4 +565,4 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,7 +98,7 @@ static const SInfosTableSchema userFuncSchema[] = {
|
|||
{.name = "name", .bytes = TSDB_FUNC_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "comment", .bytes = PATH_MAX - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "aggregate", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "comment", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "output_type", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "code_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
|
|
|
@ -240,7 +240,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
|
|||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -371,7 +371,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
|
|||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -643,6 +643,49 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo **out) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)]((void *)funcName, &msg, 0, &msgLen);
|
||||
if (code) {
|
||||
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_RETRIEVE_FUNC,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||
if (TSDB_CODE_MND_FUNC_NOT_EXIST == rpcRsp.code) {
|
||||
ctgDebug("funcName %s not exist in mnode", funcName);
|
||||
taosMemoryFreeClear(*out);
|
||||
CTG_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName);
|
||||
CTG_ERR_RET(rpcRsp.code);
|
||||
}
|
||||
|
||||
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](*out, rpcRsp.pCont, rpcRsp.contLen);
|
||||
if (code) {
|
||||
ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("Got udf from mnode, funcName:%s", funcName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
||||
if (NULL == pCtg->dbCache) {
|
||||
|
@ -2811,6 +2854,30 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
|
|||
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo));
|
||||
}
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
*pInfo = taosMemoryMalloc(sizeof(SFuncInfo));
|
||||
if (NULL == *pInfo) {
|
||||
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo));
|
||||
|
||||
_return:
|
||||
|
||||
if (code) {
|
||||
taosMemoryFreeClear(*pInfo);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
|
||||
void catalogDestroy(void) {
|
||||
qInfo("start to destroy catalog");
|
||||
|
|
|
@ -29,6 +29,7 @@ typedef struct SFuncMgtService {
|
|||
|
||||
typedef struct SUdfInfo {
|
||||
SDataType outputDt;
|
||||
int8_t funcType;
|
||||
} SUdfInfo;
|
||||
|
||||
static SFuncMgtService gFunMgtService;
|
||||
|
@ -52,30 +53,41 @@ static void doInitFunctionTable() {
|
|||
gFunMgtService.pUdfTable = NULL;
|
||||
}
|
||||
|
||||
static int8_t getUdfType(int32_t funcId) {
|
||||
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
|
||||
return pUdf->funcType;
|
||||
}
|
||||
|
||||
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
||||
if (fmIsUserDefinedFunc(funcId)) {
|
||||
return getUdfType(funcId);
|
||||
}
|
||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return false;
|
||||
}
|
||||
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
|
||||
}
|
||||
|
||||
static int32_t getUdfId(const char* pFuncName) {
|
||||
// todo: udf by call catalog
|
||||
if (1) {
|
||||
static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
|
||||
SFuncInfo* pInfo = NULL;
|
||||
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFuncName, &pInfo);
|
||||
if (TSDB_CODE_SUCCESS != code || NULL == pInfo) {
|
||||
return -1;
|
||||
}
|
||||
if (NULL == gFunMgtService.pUdfTable) {
|
||||
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
|
||||
}
|
||||
SUdfInfo info = {0}; //todo
|
||||
SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.bytes = pInfo->outputLen, .funcType = pInfo->funcType };
|
||||
taosArrayPush(gFunMgtService.pUdfTable, &info);
|
||||
tFreeSFuncInfo(pInfo);
|
||||
taosMemoryFree(pInfo);
|
||||
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
|
||||
}
|
||||
|
||||
static int32_t getFuncId(const char* pFuncName) {
|
||||
static int32_t getFuncId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
|
||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
|
||||
if (NULL == pVal) {
|
||||
return getUdfId(pFuncName);
|
||||
return getUdfId(pParam, pFuncName);
|
||||
}
|
||||
return *(int32_t*)pVal;
|
||||
}
|
||||
|
@ -91,8 +103,8 @@ int32_t fmFuncMgtInit() {
|
|||
return initFunctionCode;
|
||||
}
|
||||
|
||||
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
|
||||
*pFuncId = getFuncId(pFuncName);
|
||||
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
|
||||
*pFuncId = getFuncId(pParam, pFuncName);
|
||||
if (*pFuncId < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
|
@ -585,7 +585,8 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
|
|||
}
|
||||
|
||||
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
||||
}
|
||||
pCxt->errCode = fmGetFuncResultType(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
|
||||
|
@ -1918,7 +1919,8 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
|
|||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
|
||||
}
|
||||
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
|
||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
|
||||
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -228,6 +228,8 @@ static int32_t cpdMergeConds(SNode** pDst, SNodeList** pSrc) {
|
|||
if (NULL == pLogicCond) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
||||
pLogicCond->pParameterList = *pSrc;
|
||||
*pDst = (SNode*)pLogicCond;
|
||||
|
|
|
@ -157,6 +157,28 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||
if (NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SRetrieveFuncReq funcReq = {0};
|
||||
funcReq.numOfFuncs = 1;
|
||||
funcReq.ignoreCodeComment = true;
|
||||
funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1);
|
||||
taosArrayPush(funcReq.pFuncNames, input);
|
||||
|
||||
int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq);
|
||||
|
||||
taosArrayDestroy(funcReq.pFuncNames);
|
||||
|
||||
*msg = pBuf;
|
||||
*msgLen = bufLen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SUseDbOutput *pOut = output;
|
||||
|
@ -379,6 +401,31 @@ int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SRetrieveFuncRsp out = {0};
|
||||
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
if (tDeserializeSRetrieveFuncRsp(msg, msgSize, &out) != 0) {
|
||||
qError("tDeserializeSRetrieveFuncRsp failed, msgSize:%d", msgSize);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (1 != out.numOfFuncs) {
|
||||
qError("invalid func num returned, numOfFuncs:%d", out.numOfFuncs);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
SFuncInfo * funcInfo = taosArrayGet(out.pFuncInfos, 0);
|
||||
|
||||
memcpy(output, funcInfo, sizeof(*funcInfo));
|
||||
taosArrayDestroy(out.pFuncInfos);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void initQueryModuleMsgHandle() {
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
|
@ -386,6 +433,7 @@ void initQueryModuleMsgHandle() {
|
|||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
|
||||
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
|
@ -393,6 +441,7 @@ void initQueryModuleMsgHandle() {
|
|||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
Loading…
Reference in New Issue