Merge pull request #12274 from taosdata/feature/3.0_liaohj
enh(query):support selectivity function and normal column data query.
This commit is contained in:
commit
f872ef975f
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <libs/function/function.h>
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -803,7 +804,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
|
||||||
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
||||||
if (functionNeedToExecute(&pCtx[k])) {
|
if (functionNeedToExecute(&pCtx[k])) {
|
||||||
pCtx[k].startTs = startTs; // this can be set during create the struct
|
pCtx[k].startTs = startTs; // this can be set during create the struct
|
||||||
pCtx[k].fpSet.process(&pCtx[k]);
|
if (pCtx[k].fpSet.process != NULL)
|
||||||
|
pCtx[k].fpSet.process(&pCtx[k]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1074,35 +1076,36 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
|
||||||
// set the output buffer for the selectivity + tag query
|
// set the output buffer for the selectivity + tag query
|
||||||
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int16_t tagLen = 0;
|
|
||||||
|
|
||||||
SqlFunctionCtx* p = NULL;
|
SqlFunctionCtx* p = NULL;
|
||||||
SqlFunctionCtx** pTagCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
|
SqlFunctionCtx** pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
|
||||||
if (pTagCtx == NULL) {
|
if (pValCtx == NULL) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
int32_t functionId = pCtx[i].functionId;
|
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
|
pValCtx[num++] = &pCtx[i];
|
||||||
if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
|
||||||
tagLen += pCtx[i].resDataInfo.bytes;
|
|
||||||
pTagCtx[num++] = &pCtx[i];
|
|
||||||
} else if (1 /*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) {
|
|
||||||
p = &pCtx[i];
|
|
||||||
} else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
|
|
||||||
// tag function may be the group by tag column
|
|
||||||
// ts may be the required primary timestamp column
|
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
// the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
|
p = &pCtx[i];
|
||||||
}
|
}
|
||||||
|
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
||||||
|
// tagLen += pCtx[i].resDataInfo.bytes;
|
||||||
|
// pTagCtx[num++] = &pCtx[i];
|
||||||
|
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
|
||||||
|
// // tag function may be the group by tag column
|
||||||
|
// // ts may be the required primary timestamp column
|
||||||
|
// continue;
|
||||||
|
// } else {
|
||||||
|
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
p->subsidiaries.pCtx = pTagCtx;
|
p->subsidiaries.pCtx = pValCtx;
|
||||||
p->subsidiaries.num = num;
|
p->subsidiaries.num = num;
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFreeClear(pTagCtx);
|
taosMemoryFreeClear(pValCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2219,6 +2222,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
|
||||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
if (pCtx[j].fpSet.process) {
|
if (pCtx[j].fpSet.process) {
|
||||||
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
|
// do nothing, todo refactor
|
||||||
} else {
|
} else {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
@ -3974,6 +3979,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4037,9 +4043,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
||||||
pProjectInfo->pPseudoColInfo);
|
pProjectInfo->pPseudoColInfo);
|
||||||
|
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t status = handleLimitOffset(pOperator, pBlock);
|
int32_t status = handleLimitOffset(pOperator, pBlock);
|
||||||
if (status == PROJECT_RETRIEVE_CONTINUE) {
|
if (status == PROJECT_RETRIEVE_CONTINUE) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
|
@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->pScalarExprInfo != NULL) {
|
if (pInfo->pScalarExprInfo != NULL) {
|
||||||
projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||||
|
|
|
@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||||
SOperatorInfo* pOperator = param;
|
SOperatorInfo* pOperator = param;
|
||||||
SSortOperatorInfo* pSort = pOperator->info;
|
SSortOperatorInfo* pSort = pOperator->info;
|
||||||
if (pOperator->pExpr != NULL) {
|
if (pOperator->pExpr != NULL) {
|
||||||
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
|
int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t sumFunction(SqlFunctionCtx *pCtx);
|
int32_t sumFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
|
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
|
||||||
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t minFunction(SqlFunctionCtx* pCtx);
|
int32_t minFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
@ -82,6 +83,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
|
||||||
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,9 +510,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateInOutNum,
|
.translateFunc = translateInOutNum,
|
||||||
.dataRequiredFunc = statisDataRequired,
|
.dataRequiredFunc = statisDataRequired,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = minFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = minFunction,
|
.processFunc = minFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = minmaxFunctionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "max",
|
.name = "max",
|
||||||
|
@ -520,9 +521,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateInOutNum,
|
.translateFunc = translateInOutNum,
|
||||||
.dataRequiredFunc = statisDataRequired,
|
.dataRequiredFunc = statisDataRequired,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = minmaxFunctionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "stddev",
|
.name = "stddev",
|
||||||
|
@ -562,14 +563,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.translateFunc = translateApercentile,
|
.translateFunc = translateApercentile,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "top",
|
.name = "top",
|
||||||
.type = FUNCTION_TYPE_TOP,
|
.type = FUNCTION_TYPE_TOP,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
||||||
.translateFunc = translateTop,
|
.translateFunc = translateTop,
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "bottom",
|
.name = "bottom",
|
||||||
.type = FUNCTION_TYPE_BOTTOM,
|
.type = FUNCTION_TYPE_BOTTOM,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
||||||
.translateFunc = translateBottom,
|
.translateFunc = translateBottom,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = bottomFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = topBotFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "spread",
|
.name = "spread",
|
||||||
|
@ -603,7 +604,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
|
||||||
.translateFunc = translateLastRow,
|
.translateFunc = translateLastRow,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = maxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = functionFinalize
|
||||||
},
|
},
|
||||||
|
@ -1032,8 +1033,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.type = FUNCTION_TYPE_SELECT_VALUE,
|
.type = FUNCTION_TYPE_SELECT_VALUE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
||||||
.translateFunc = translateSelectValue,
|
.translateFunc = translateSelectValue,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
|
||||||
.initFunc = NULL,
|
.initFunc = functionSetup,
|
||||||
.sprocessFunc = NULL,
|
.sprocessFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,13 +37,15 @@ typedef struct SAvgRes {
|
||||||
int64_t count;
|
int64_t count;
|
||||||
} SAvgRes;
|
} SAvgRes;
|
||||||
|
|
||||||
|
typedef struct STuplePos {
|
||||||
|
int32_t pageId;
|
||||||
|
int32_t offset;
|
||||||
|
} STuplePos;
|
||||||
|
|
||||||
typedef struct STopBotResItem {
|
typedef struct STopBotResItem {
|
||||||
SVariant v;
|
SVariant v;
|
||||||
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
||||||
struct {
|
STuplePos tuplePos; // tuple data of this chosen row
|
||||||
int32_t pageId;
|
|
||||||
int32_t offset;
|
|
||||||
} tuplePos; // tuple data of this chosen row
|
|
||||||
} STopBotResItem;
|
} STopBotResItem;
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
typedef struct STopBotRes {
|
||||||
|
@ -616,101 +618,25 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
||||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
typedef struct SMinmaxResInfo {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
bool assign; // assign the first value or not
|
||||||
return false;
|
int64_t v;
|
||||||
}
|
STuplePos tuplePos;
|
||||||
|
} SMinmaxResInfo;
|
||||||
|
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
switch (pCtx->resDataInfo.type) {
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
*((int32_t*)buf) = INT32_MIN;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UINT:
|
|
||||||
*((uint32_t*)buf) = 0;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
*((float*)buf) = -FLT_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
|
||||||
SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
*((int64_t*)buf) = INT64_MIN;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
|
||||||
*((uint64_t*)buf) = 0;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
*((int16_t*)buf) = INT16_MIN;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
|
||||||
*((uint16_t*)buf) = 0;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
*((int8_t*)buf) = INT8_MIN;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
|
||||||
*((uint8_t*)buf) = 0;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
|
||||||
*((int8_t*)buf) = 0;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
assert(0);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false; // not initialized since it has been initialized
|
return false; // not initialized since it has been initialized
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
switch (pCtx->resDataInfo.type) {
|
buf->assign = false;
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
buf->tuplePos.pageId = -1;
|
||||||
*((int8_t*)buf) = INT8_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
|
||||||
*(uint8_t*)buf = UINT8_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
*((int16_t*)buf) = INT16_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
|
||||||
*((uint16_t*)buf) = UINT16_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
*((int32_t*)buf) = INT32_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UINT:
|
|
||||||
*((uint32_t*)buf) = UINT32_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
*((int64_t*)buf) = INT64_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
|
||||||
*((uint64_t*)buf) = UINT64_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
*((float*)buf) = FLT_MAX;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
|
||||||
SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
|
||||||
*((int8_t*)buf) = 1;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
assert(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(int64_t);
|
pEnv->calcMemSize = sizeof(SMinmaxResInfo);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,6 +684,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
|
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
|
|
||||||
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
@ -768,13 +697,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
int32_t type = pCol->info.type;
|
int32_t type = pCol->info.type;
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
SMinmaxResInfo *pBuf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
// data in current data block are qualified to the query
|
// data in current data block are qualified to the query
|
||||||
if (pInput->colDataAggIsSet) {
|
if (pInput->colDataAggIsSet) {
|
||||||
numOfElems = pInput->numOfRows - pAgg->numOfNull;
|
numOfElems = pInput->numOfRows - pAgg->numOfNull;
|
||||||
ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);
|
ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);
|
||||||
|
|
||||||
if (numOfElems == 0) {
|
if (numOfElems == 0) {
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
@ -793,48 +721,82 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
// the index is the original position, not the relative position
|
// the index is the original position, not the relative position
|
||||||
TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
|
TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
if (!pBuf->assign) {
|
||||||
int64_t prev = 0;
|
pBuf->v = *(int64_t*)tval;
|
||||||
GET_TYPED_DATA(prev, int64_t, type, buf);
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
int64_t prev = 0;
|
||||||
|
GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
|
||||||
|
|
||||||
int64_t val = GET_INT64_VAL(tval);
|
int64_t val = GET_INT64_VAL(tval);
|
||||||
if ((prev < val) ^ isMinFunc) {
|
if ((prev < val) ^ isMinFunc) {
|
||||||
*(int64_t*)buf = val;
|
pBuf->v = val;
|
||||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||||
__ctx->tag.i = key;
|
// __ctx->tag.i = key;
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// __ctx->fpSet.process(__ctx);
|
||||||
|
// }
|
||||||
|
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
__ctx->fpSet.process(__ctx);
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
uint64_t prev = 0;
|
||||||
|
GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v);
|
||||||
|
|
||||||
|
uint64_t val = GET_UINT64_VAL(tval);
|
||||||
|
if ((prev < val) ^ isMinFunc) {
|
||||||
|
pBuf->v = val;
|
||||||
|
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
||||||
|
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
||||||
|
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
||||||
|
// __ctx->tag.i = key;
|
||||||
|
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// __ctx->fpSet.process(__ctx);
|
||||||
|
// }
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
double prev = 0;
|
||||||
|
GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
|
||||||
|
|
||||||
|
double val = GET_DOUBLE_VAL(tval);
|
||||||
|
if ((prev < val) ^ isMinFunc) {
|
||||||
|
pBuf->v = val;
|
||||||
|
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
|
double prev = 0;
|
||||||
|
GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
|
||||||
|
|
||||||
|
double val = GET_DOUBLE_VAL(tval);
|
||||||
|
if ((prev < val) ^ isMinFunc) {
|
||||||
|
pBuf->v = val;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
|
||||||
uint64_t prev = 0;
|
|
||||||
GET_TYPED_DATA(prev, uint64_t, type, buf);
|
|
||||||
|
|
||||||
uint64_t val = GET_UINT64_VAL(tval);
|
|
||||||
if ((prev < val) ^ isMinFunc) {
|
|
||||||
*(uint64_t*)buf = val;
|
|
||||||
for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
|
|
||||||
SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
|
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
|
|
||||||
__ctx->tag.i = key;
|
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
}
|
|
||||||
|
|
||||||
__ctx->fpSet.process(__ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
double val = GET_DOUBLE_VAL(tval);
|
|
||||||
UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
|
|
||||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
|
||||||
double val = GET_DOUBLE_VAL(tval);
|
|
||||||
UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBuf->assign = true;
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,47 +805,318 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||||
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
|
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
|
||||||
LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
|
int8_t* pData = (int8_t*)pCol->pData;
|
||||||
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
int8_t* val = (int8_t*)&pBuf->v;
|
||||||
LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
|
||||||
} else if (type == TSDB_DATA_TYPE_INT) {
|
|
||||||
int32_t* pData = (int32_t*)pCol->pData;
|
|
||||||
int32_t* val = (int32_t*)buf;
|
|
||||||
|
|
||||||
for (int32_t i = start; i < start + numOfRows; ++i) {
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
DO_UPDATE_SUBSID_RES(pCtx, ts);
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
}
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
||||||
|
int16_t* pData = (int16_t*)pCol->pData;
|
||||||
|
int16_t* val = (int16_t*)&pBuf->v;
|
||||||
|
|
||||||
#if defined(_DEBUG_VIEW)
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
qDebug("max value updated:%d", *retVal);
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
#endif
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
|
} else if (type == TSDB_DATA_TYPE_INT) {
|
||||||
|
int32_t* pData = (int32_t*)pCol->pData;
|
||||||
|
int32_t* val = (int32_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_BIGINT) {
|
} else if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
|
int64_t* pData = (int64_t*)pCol->pData;
|
||||||
|
int64_t* val = (int64_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
if (type == TSDB_DATA_TYPE_UTINYINT) {
|
if (type == TSDB_DATA_TYPE_UTINYINT) {
|
||||||
LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
|
uint8_t* pData = (uint8_t*)pCol->pData;
|
||||||
|
uint8_t* val = (uint8_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
|
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
|
||||||
LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
|
uint16_t* pData = (uint16_t*)pCol->pData;
|
||||||
|
uint16_t* val = (uint16_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_UINT) {
|
} else if (type == TSDB_DATA_TYPE_UINT) {
|
||||||
LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
|
uint32_t* pData = (uint32_t*)pCol->pData;
|
||||||
|
uint32_t* val = (uint32_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
|
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
|
||||||
LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
|
uint64_t* pData = (uint64_t*)pCol->pData;
|
||||||
|
uint64_t* val = (uint64_t*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
|
double* pData = (double*)pCol->pData;
|
||||||
|
double* val = (double*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
|
float* pData = (float*)pCol->pData;
|
||||||
|
double* val = (double*)&pBuf->v;
|
||||||
|
|
||||||
|
for (int32_t i = start; i < start + numOfRows; ++i) {
|
||||||
|
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pBuf->assign) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
pBuf->assign = true;
|
||||||
|
} else {
|
||||||
|
// ignore the equivalent data value
|
||||||
|
if ((*val) == pData[i]) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
|
@ -901,6 +1134,65 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex);
|
||||||
|
|
||||||
|
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
|
||||||
|
int32_t type = pCtx->input.pData[0]->info.type;
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
// todo assign the tag value
|
||||||
|
int32_t currentRow = pBlock->info.rows;
|
||||||
|
|
||||||
|
if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
|
float v = *(double*) &pRes->v;
|
||||||
|
colDataAppend(pCol, currentRow, (const char*)&v, false);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pCol, currentRow, (const char*)&pRes->v, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
|
||||||
|
return pEntryInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex) {
|
||||||
|
int32_t pageId = pTuplePos->pageId;
|
||||||
|
int32_t offset = pTuplePos->offset;
|
||||||
|
if (pTuplePos->pageId != -1) {
|
||||||
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||||
|
|
||||||
|
bool* nullList = (bool*)((char*)pPage + offset);
|
||||||
|
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
|
||||||
|
|
||||||
|
// todo set the offset value to optimize the performance.
|
||||||
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
|
||||||
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
int32_t ps = 0;
|
||||||
|
for (int32_t k = 0; k < srcSlotId; ++k) {
|
||||||
|
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
||||||
|
ps += pSrcCol->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
if (nullList[srcSlotId]) {
|
||||||
|
colDataAppendNULL(pDstCol, rowIndex);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SStddevRes);
|
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||||
return true;
|
return true;
|
||||||
|
@ -1244,6 +1536,14 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
||||||
if (pTsColInfo == NULL) {
|
if (pTsColInfo == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1622,35 +1922,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo);
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||||
|
|
||||||
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
|
||||||
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
|
|
||||||
|
|
||||||
int32_t topFunction(SqlFunctionCtx* pCtx) {
|
int32_t topFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
|
|
||||||
// buildTopBotStruct(pRes, pCtx);
|
|
||||||
// }
|
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
int32_t numOfRows = pInput->numOfRows;
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
|
||||||
for (int32_t i = start; i < numOfRows + start; ++i) {
|
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
|
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1684,7 +1998,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
|
||||||
}
|
}
|
||||||
|
|
||||||
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
int32_t maxSize = pCtx->param[1].param.i;
|
int32_t maxSize = pCtx->param[1].param.i;
|
||||||
|
|
||||||
|
@ -1701,30 +2015,36 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
pItem->uid = uid;
|
pItem->uid = uid;
|
||||||
|
|
||||||
// save the data of this tuple
|
// save the data of this tuple
|
||||||
saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
|
|
||||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||||
pEntryInfo->numOfRes++;
|
pEntryInfo->numOfRes++;
|
||||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||||
false);
|
!isTopQuery);
|
||||||
} else { // replace the minimum value in the result
|
} else { // replace the minimum value in the result
|
||||||
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
if ((isTopQuery && (
|
||||||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
|
(IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
|
||||||
|
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
|
||||||
|
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)))
|
||||||
|
|| (!isTopQuery && (
|
||||||
|
(IS_SIGNED_NUMERIC_TYPE(type) && val.i < pItems[0].v.i) ||
|
||||||
|
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pItems[0].v.u) ||
|
||||||
|
(IS_FLOAT_TYPE(type) && val.d < pItems[0].v.d))
|
||||||
|
)) {
|
||||||
// replace the old data and the coresponding tuple data
|
// replace the old data and the coresponding tuple data
|
||||||
STopBotResItem* pItem = &pItems[0];
|
STopBotResItem* pItem = &pItems[0];
|
||||||
pItem->v = val;
|
pItem->v = val;
|
||||||
pItem->uid = uid;
|
pItem->uid = uid;
|
||||||
|
|
||||||
// save the data of this tuple by over writing the old data
|
// save the data of this tuple by over writing the old data
|
||||||
copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
|
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
|
|
||||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||||
topBotResComparFn, NULL, false);
|
topBotResComparFn, NULL, !isTopQuery);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = NULL;
|
SFilePage* pPage = NULL;
|
||||||
|
|
||||||
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
|
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
|
||||||
|
@ -1740,7 +2060,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem->tuplePos.pageId = pCtx->curBufPage;
|
pPos->pageId = pCtx->curBufPage;
|
||||||
|
|
||||||
// keep the current row data, extract method
|
// keep the current row data, extract method
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
@ -1751,6 +2071,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
bool isNull = colDataIsNull_s(pCol, rowIndex);
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
nullList[i] = true;
|
nullList[i] = true;
|
||||||
|
offset += pCol->info.bytes;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1764,17 +2085,17 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
offset += pCol->info.bytes;
|
offset += pCol->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem->tuplePos.offset = pPage->num;
|
pPos->offset = pPage->num;
|
||||||
pPage->num += completeRowSize;
|
pPage->num += completeRowSize;
|
||||||
|
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
releaseBufPage(pCtx->pBuf, pPage);
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
|
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||||
|
|
||||||
bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
|
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
||||||
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
|
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
@ -1803,54 +2124,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
pEntryInfo->complete = true;
|
pEntryInfo->complete = true;
|
||||||
|
|
||||||
int32_t type = pCtx->input.pData[0]->info.type;
|
int32_t type = pCtx->input.pData[0]->info.type;
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
// todo assign the tag value and the corresponding row data
|
// todo assign the tag value and the corresponding row data
|
||||||
int32_t currentRow = pBlock->info.rows;
|
int32_t currentRow = pBlock->info.rows;
|
||||||
switch (type) {
|
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||||
case TSDB_DATA_TYPE_INT: {
|
STopBotResItem* pItem = &pRes->pItems[i];
|
||||||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
STopBotResItem* pItem = &pRes->pItems[i];
|
float v = pItem->v.d;
|
||||||
colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
|
colDataAppend(pCol, currentRow, (const char*)&v, false);
|
||||||
|
} else {
|
||||||
int32_t pageId = pItem->tuplePos.pageId;
|
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||||
int32_t offset = pItem->tuplePos.offset;
|
|
||||||
if (pItem->tuplePos.pageId != -1) {
|
|
||||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
|
||||||
|
|
||||||
bool* nullList = (bool*)((char*)pPage + offset);
|
|
||||||
char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
|
|
||||||
|
|
||||||
// todo set the offset value to optimize the performance.
|
|
||||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
|
||||||
|
|
||||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
|
||||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
|
||||||
int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
|
|
||||||
|
|
||||||
int32_t ps = 0;
|
|
||||||
for (int32_t k = 0; k < srcSlotId; ++k) {
|
|
||||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
|
|
||||||
ps += pSrcCol->info.bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
|
||||||
if (nullList[srcSlotId]) {
|
|
||||||
colDataAppendNULL(pDstCol, currentRow);
|
|
||||||
} else {
|
|
||||||
colDataAppend(pDstCol, currentRow, (pStart + ps), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
currentRow += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||||
|
currentRow += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
|
|
|
@ -140,7 +140,8 @@ endi
|
||||||
if $data00 != -13 then
|
if $data00 != -13 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data01 != -2.30000 then
|
if $data01 != -2.30000 then
|
||||||
|
print expect -2.30000, actual: $data01
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data02 != -3.300000000 then
|
if $data02 != -3.300000000 then
|
||||||
|
|
Loading…
Reference in New Issue