feature/qnode
This commit is contained in:
parent
8bbc6b09ca
commit
54dd9e3ca4
|
@ -17,6 +17,7 @@
|
|||
#include "qndInt.h"
|
||||
#include "query.h"
|
||||
#include "qworker.h"
|
||||
//#include "tudf.h"
|
||||
|
||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
||||
|
@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
//udfcOpen();
|
||||
|
||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||
taosMemoryFreeClear(pQnode);
|
||||
return NULL;
|
||||
|
@ -37,6 +40,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
|||
void qndClose(SQnode *pQnode) {
|
||||
qWorkerDestroy((void **)&pQnode->pQuery);
|
||||
|
||||
//udfcClose();
|
||||
|
||||
taosMemoryFree(pQnode);
|
||||
}
|
||||
|
||||
|
|
|
@ -329,28 +329,40 @@ _return:
|
|||
}
|
||||
|
||||
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
||||
SScalarFuncExecFuncs ffpSet = {0};
|
||||
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||
if (code) {
|
||||
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||
SCL_ERR_RET(code);
|
||||
}
|
||||
|
||||
SScalarParam *params = NULL;
|
||||
int32_t rowNum = 0;
|
||||
int32_t paramNum = 0;
|
||||
int32_t code = 0;
|
||||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||
|
||||
output->columnData = createColumnInfoData(&node->node.resType, rowNum);
|
||||
if (output->columnData == NULL) {
|
||||
sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
|
||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||
#if 0
|
||||
UdfcFuncHandle udfHandle = NULL;
|
||||
|
||||
code = (*ffpSet.process)(params, paramNum, output);
|
||||
if (code) {
|
||||
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
||||
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||
teardownUdf(udfHandle);
|
||||
SCL_ERR_JRET(code);
|
||||
#endif
|
||||
} else {
|
||||
SScalarFuncExecFuncs ffpSet = {0};
|
||||
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||
if (code) {
|
||||
sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||
SCL_ERR_JRET(code);
|
||||
}
|
||||
|
||||
output->columnData = createColumnInfoData(&node->node.resType, rowNum);
|
||||
if (output->columnData == NULL) {
|
||||
sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
|
||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
code = (*ffpSet.process)(params, paramNum, output);
|
||||
if (code) {
|
||||
sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
|
||||
SCL_ERR_JRET(code);
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
|
Loading…
Reference in New Issue