add bottom function distribution splitting
This commit is contained in:
parent
ca22215718
commit
4036d9714e
|
@ -136,6 +136,8 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_ELAPSED_MERGE,
|
||||
FUNCTION_TYPE_TOP_PARTIAL,
|
||||
FUNCTION_TYPE_TOP_MERGE,
|
||||
FUNCTION_TYPE_BOTTOM_PARTIAL,
|
||||
FUNCTION_TYPE_BOTTOM_MERGE,
|
||||
|
||||
// user defined funcion
|
||||
FUNCTION_TYPE_UDF = 10000
|
||||
|
|
|
@ -104,6 +104,7 @@ bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
|
|||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t topFunctionMerge(SqlFunctionCtx *pCtx);
|
||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx);
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
|
|
@ -1574,10 +1574,34 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBot,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "_bottom_partial",
|
||||
.pMergeFunc = "_bottom_merge"
|
||||
},
|
||||
{
|
||||
.name = "_bottom_partial",
|
||||
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotPartial,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotPartialFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
},
|
||||
{
|
||||
.name = "_bottom_merge",
|
||||
.type = FUNCTION_TYPE_BOTTOM_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotMerge,
|
||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = bottomFunctionMerge,
|
||||
.finalizeFunc = topBotMergeFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
},
|
||||
{
|
||||
.name = "spread",
|
||||
|
|
|
@ -2716,9 +2716,33 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) {
|
||||
int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
pRes->type = pInput->pData[0]->info.type;
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
|
||||
for (int32_t i = 0; i < pInput->numOfItems; i++) {
|
||||
addResult(pCtx, &pInput->pItems[i], pInput->type, true);
|
||||
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2735,31 +2759,28 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
|
|||
|
||||
pInfo->maxSize = pInputInfo->maxSize;
|
||||
pInfo->type = pInputInfo->type;
|
||||
topTransferInfo(pCtx, pInputInfo);
|
||||
topBotTransferInfo(pCtx, pInputInfo, true);
|
||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
char* data = colDataGetData(pCol, start);
|
||||
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
|
||||
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
numOfElems++;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false);
|
||||
}
|
||||
pInfo->maxSize = pInputInfo->maxSize;
|
||||
pInfo->type = pInputInfo->type;
|
||||
topBotTransferInfo(pCtx, pInputInfo, false);
|
||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue