refactor(query): top/bot function refactor
This commit is contained in:
parent
8a15c2adb7
commit
02cb6dc412
|
@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||
int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1)));
|
||||
|
@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
|
||||
if (isPartial) {
|
||||
if (2 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param1
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
||||
// set result type
|
||||
pFunc->node.resType =
|
||||
(SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
} else {
|
||||
if (1 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_BINARY != para1Type) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// Do nothing. We can only access output of partial functions as input,
|
||||
// so original input type cannot be obtained, resType will be set same
|
||||
// as original function input type after merge function created.
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateTopBotImpl(pFunc, pErrBuf, len, true);
|
||||
}
|
||||
|
||||
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateTopBotImpl(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = topFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = topCombine,
|
||||
.pPartialFunc = "_top_partial",
|
||||
.pMergeFunc = "_top_merge",
|
||||
// .createMergeParaFuc = topCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "_top_partial",
|
||||
.type = FUNCTION_TYPE_TOP_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotPartial,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = topFunction,
|
||||
.finalizeFunc = topBotPartialFinalize,
|
||||
.combineFunc = topCombine,
|
||||
},
|
||||
{
|
||||
.name = "_top_merge",
|
||||
.type = FUNCTION_TYPE_TOP_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotMerge,
|
||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = topFunctionMerge,
|
||||
.finalizeFunc = topBotMergeFinalize,
|
||||
.combineFunc = topCombine,
|
||||
.pPartialFunc = "top",
|
||||
.pMergeFunc = "top",
|
||||
.createMergeParaFuc = topBotCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "bottom",
|
||||
|
@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "_bottom_partial",
|
||||
.pMergeFunc = "_bottom_merge"
|
||||
},
|
||||
{
|
||||
.name = "_bottom_partial",
|
||||
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotPartial,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotPartialFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
},
|
||||
{
|
||||
.name = "_bottom_merge",
|
||||
.type = FUNCTION_TYPE_BOTTOM_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotMerge,
|
||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = bottomFunctionMerge,
|
||||
.finalizeFunc = topBotMergeFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "bottom",
|
||||
.pMergeFunc = "bottom",
|
||||
.createMergeParaFuc = topBotCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "spread",
|
||||
|
@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = NULL,
|
||||
.finalizeFunc = NULL
|
||||
.finalizeFunc = NULL,
|
||||
.pPartialFunc = "_select_value",
|
||||
.pMergeFunc = "_select_value"
|
||||
},
|
||||
{
|
||||
.name = "_block_dist",
|
||||
|
|
|
@ -2870,12 +2870,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
// intermediate result is binary and length contains VAR header size
|
||||
pEnv->calcMemSize = pFunc->node.resType.bytes;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
|
@ -3142,7 +3136,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
|
||||
int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
|
@ -3162,39 +3156,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer
|
|||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||
}
|
||||
|
||||
if (!isMerge) {
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
}
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
||||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); }
|
||||
|
||||
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
return topBotFinalizeImpl(pCtx, pBlock, true);
|
||||
}
|
||||
|
||||
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
||||
memcpy(varDataVal(res), pRes, resultBytes);
|
||||
varDataSetLen(res, resultBytes);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
|
|
Loading…
Reference in New Issue