add feature that udaf produces no result

This commit is contained in:
shenglian zhou 2022-04-25 17:54:18 +08:00
parent 73a0ad7414
commit f8e71908c4
2 changed files with 31 additions and 21 deletions

View File

@ -42,8 +42,7 @@ enum {
UDFC_CODE_INVALID_STATE = -5
};
typedef void *UdfcFuncHandle;
/**
* setup udf
@ -95,10 +94,9 @@ typedef struct SUdfDataBlock {
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
// output: interBuf
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
// input: block, state

View File

@ -241,12 +241,14 @@ void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
int32_t len = 0;
len += taosEncodeFixedI8(buf, state->numOfResult);
len += taosEncodeFixedI32(buf, state->bufLen);
len += taosEncodeBinary(buf, state->buf, state->bufLen);
return len;
}
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
buf = taosDecodeFixedI8(buf, &state->numOfResult);
buf = taosDecodeFixedI32(buf, &state->bufLen);
buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
return (void*)buf;
@ -1250,41 +1252,44 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
// TODO: range [start, start+numOfRow) to generate colInfoData
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
SUdfUvSession *session = (SUdfUvSession *)handle;
SSDataBlock input = {0};
input.info.numOfCols = numOfCols;
input.info.rows = numOfRows;
input.info.uid = pInput->uid;
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = numOfRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
input.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *col = pInput->pData[i];
if (IS_VAR_DATA_TYPE(col->info.type)) {
hasVarCol = true;
}
taosArrayPush(input.pDataBlock, col);
taosArrayPush(tempBlock.pDataBlock, col);
}
tempBlock.info.hasVarCol = hasVarCol;
input.info.hasVarCol = hasVarCol;
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize};
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen,
.bufLen = session->bufSize,
.numOfResult = GET_RES_INFO(pCtx)->numOfRes};
SUdfInterBuf newState = {0};
callUdfAggProcess(handle, &input, &state, &newState);
callUdfAggProcess(handle, inputBlock, &state, &newState);
memcpy(state.buf, newState.buf, newState.bufLen);
taosArrayDestroy(input.pDataBlock);
GET_RES_INFO(pCtx)->numOfRes = (newState.numOfResult == 1 ? 1 : 0);
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf);
return 0;
@ -1296,10 +1301,17 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
SUdfUvSession *session = (SUdfUvSession *)handle;
SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t), .bufLen = session->outputLen};
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize};
SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t),
.bufLen = session->outputLen};
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen,
.bufLen = session->bufSize,
.numOfResult = GET_RES_INFO(pCtx)->numOfRes};
callUdfAggFinalize(handle, &state, &resultBuf);
GET_RES_INFO(pCtx)->numOfRes = (resultBuf.numOfResult == 1 ? 1 : 0);
functionFinalize(pCtx, pBlock);
teardownUdf(handle);
return 0;
}