udaf has result processing
This commit is contained in:
parent
38f43f5a9f
commit
ac7f492cce
|
@ -1253,8 +1253,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
if (callUdfAggInit(handle, &buf) != 0) {
|
if (callUdfAggInit(handle, &buf) != 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
|
||||||
udfRes->interResNum = buf.numOfResult;
|
udfRes->interResNum = buf.numOfResult;
|
||||||
|
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1263,14 +1263,14 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
int32_t numOfCols = pInput->numOfInputCols;
|
int32_t numOfCols = pInput->numOfInputCols;
|
||||||
|
|
||||||
char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SUdfUvSession *session = udfRes->session;
|
||||||
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
int32_t numOfRows = pInput->numOfRows;
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
|
|
||||||
SUdfUvSession *session = (SUdfUvSession *)handle;
|
|
||||||
|
|
||||||
SSDataBlock tempBlock = {0};
|
SSDataBlock tempBlock = {0};
|
||||||
tempBlock.info.numOfCols = numOfCols;
|
tempBlock.info.numOfCols = numOfCols;
|
||||||
|
@ -1290,16 +1290,20 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
|
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
|
||||||
|
|
||||||
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen,
|
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||||
.bufLen = session->bufSize,
|
.bufLen = session->bufSize,
|
||||||
.numOfResult = GET_RES_INFO(pCtx)->numOfRes};
|
.numOfResult = udfRes->interResNum};
|
||||||
SUdfInterBuf newState = {0};
|
SUdfInterBuf newState = {0};
|
||||||
|
|
||||||
callUdfAggProcess(handle, inputBlock, &state, &newState);
|
callUdfAggProcess(session, inputBlock, &state, &newState);
|
||||||
|
|
||||||
memcpy(state.buf, newState.buf, newState.bufLen);
|
udfRes->interResNum = newState.numOfResult;
|
||||||
|
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
|
||||||
|
|
||||||
|
if (newState.numOfResult == 1 || state.numOfResult == 1) {
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||||
|
}
|
||||||
|
|
||||||
GET_RES_INFO(pCtx)->numOfRes = (newState.numOfResult == 1 ? 1 : 0);
|
|
||||||
blockDataDestroy(inputBlock);
|
blockDataDestroy(inputBlock);
|
||||||
|
|
||||||
taosArrayDestroy(tempBlock.pDataBlock);
|
taosArrayDestroy(tempBlock.pDataBlock);
|
||||||
|
@ -1309,22 +1313,26 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
SUdfUvSession *session = udfRes->session;
|
||||||
|
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||||
|
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||||
|
|
||||||
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
|
|
||||||
SUdfUvSession *session = (SUdfUvSession *)handle;
|
|
||||||
|
|
||||||
SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t),
|
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf,
|
||||||
.bufLen = session->outputLen};
|
.bufLen = session->outputLen,
|
||||||
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen,
|
.numOfResult = udfRes->finalResNum};
|
||||||
|
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||||
.bufLen = session->bufSize,
|
.bufLen = session->bufSize,
|
||||||
.numOfResult = GET_RES_INFO(pCtx)->numOfRes};
|
.numOfResult = udfRes->interResNum};
|
||||||
callUdfAggFinalize(handle, &state, &resultBuf);
|
callUdfAggFinalize(session, &state, &resultBuf);
|
||||||
|
teardownUdf(session);
|
||||||
|
|
||||||
GET_RES_INFO(pCtx)->numOfRes = (resultBuf.numOfResult == 1 ? 1 : 0);
|
if (resultBuf.numOfResult == 1) {
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||||
|
}
|
||||||
functionFinalize(pCtx, pBlock);
|
functionFinalize(pCtx, pBlock);
|
||||||
|
|
||||||
teardownUdf(handle);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue