refactor
This commit is contained in:
parent
ab9b76e1f9
commit
4c29b82626
|
@ -100,8 +100,10 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
|
||||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t topFunctionMerge(SqlFunctionCtx *pCtx);
|
||||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t getTopBotInfoSize();
|
int32_t getTopBotInfoSize();
|
||||||
|
|
|
@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = topFunction,
|
.processFunc = topFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotPartialFinalize,
|
||||||
.combineFunc = topCombine,
|
.combineFunc = topCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -1559,7 +1559,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateTopBotMerge,
|
.translateFunc = translateTopBotMerge,
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = topFunction,
|
.processFunc = topFunctionMerge,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
.combineFunc = topCombine,
|
.combineFunc = topCombine,
|
||||||
},
|
},
|
||||||
|
|
|
@ -2727,7 +2727,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
|
STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data);
|
||||||
|
|
||||||
topTransferInfo(pCtx, pInputInfo);
|
topTransferInfo(pCtx, pInputInfo);
|
||||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2930,6 +2930,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getTopBotInfoSize();
|
||||||
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pRes, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
||||||
bool isTopQuery) {
|
bool isTopQuery) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
@ -2990,15 +3008,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getSpreadInfoSize() {
|
||||||
|
return (int32_t)sizeof(SSpreadInfo);
|
||||||
|
}
|
||||||
|
|
||||||
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SSpreadInfo);
|
pEnv->calcMemSize = sizeof(SSpreadInfo);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getSpreadInfoSize() {
|
|
||||||
return (int32_t)sizeof(SSpreadInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -3125,7 +3143,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
int32_t resultBytes = (int32_t)sizeof(SSpreadInfo);
|
int32_t resultBytes = getSpreadInfoSize();
|
||||||
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
memcpy(varDataVal(res), pInfo, resultBytes);
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
|
Loading…
Reference in New Issue