Merge branch 'feature/udf' of github.com:taosdata/TDengine into feature/udf
This commit is contained in:
commit
65e4258169
|
@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte
|
|||
// input: interBuf
|
||||
// output: resultData
|
||||
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
|
||||
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
|
||||
int8_t callType = TSDB_UDF_CALL_AGG_FIN;
|
||||
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
|
||||
return err;
|
||||
}
|
||||
|
@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
|||
}
|
||||
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
||||
memset(udfRes, 0, envSize);
|
||||
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
udfRes->session = (SUdfUvSession *)handle;
|
||||
SUdfInterBuf buf = {0};
|
||||
if (callUdfAggInit(handle, &buf) != 0) {
|
||||
|
@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
|||
}
|
||||
|
||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t numOfCols = pInput->numOfInputCols;
|
||||
|
||||
|
@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
|||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
|
||||
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf,
|
||||
.bufLen = session->outputLen,
|
||||
.numOfResult = udfRes->finalResNum};
|
||||
SUdfInterBuf resultBuf = {0};
|
||||
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||
.bufLen = session->bufSize,
|
||||
.numOfResult = udfRes->interResNum};
|
||||
callUdfAggFinalize(session, &state, &resultBuf);
|
||||
|
||||
udfRes->finalResBuf = resultBuf.buf;
|
||||
udfRes->finalResNum = resultBuf.numOfResult;
|
||||
|
||||
teardownUdf(session);
|
||||
|
||||
if (resultBuf.numOfResult == 1) {
|
||||
|
|
|
@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
char *finishSuffix = "_finish";
|
||||
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
||||
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
||||
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc));
|
||||
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
|
||||
//TODO: merge
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
|
|||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++j) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
//TODO: check the bitmap for null value
|
||||
int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
|
||||
|
@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
|
|||
}
|
||||
}
|
||||
|
||||
*(int64_t*)newInterBuf = sumSquares;
|
||||
*(int64_t*)(newInterBuf->buf) = sumSquares;
|
||||
newInterBuf->bufLen = sizeof(int64_t);
|
||||
//TODO: if all null value, numOfResult = 0;
|
||||
newInterBuf->numOfResult = 1;
|
||||
|
|
|
@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond
|
|||
}
|
||||
|
||||
static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
|
||||
COPY_ALL_SCALAR_FIELDS;
|
||||
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
|
||||
COPY_CHAR_ARRAY_FIELD(functionName);
|
||||
COPY_SCALAR_FIELD(funcId);
|
||||
|
|
|
@ -1885,6 +1885,7 @@ static const char* jkFunctionName = "Name";
|
|||
static const char* jkFunctionId = "Id";
|
||||
static const char* jkFunctionType = "Type";
|
||||
static const char* jkFunctionParameter = "Parameters";
|
||||
static const char* jkFunctionUdfBufSize = "UdfBufSize";
|
||||
|
||||
static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SFunctionNode* pNode = (const SFunctionNode*)pObj;
|
||||
|
@ -1902,6 +1903,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkFunctionUdfBufSize, pNode->udfBufSize);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1922,6 +1926,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkFunctionUdfBufSize, &pNode->udfBufSize);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue