Merge pull request #14399 from taosdata/enh/last_row_function
feat(query): add last_row function without cache
This commit is contained in:
commit
45b47b7a2b
|
@ -34,6 +34,7 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_ELAPSED,
|
||||
FUNCTION_TYPE_IRATE,
|
||||
FUNCTION_TYPE_LAST_ROW,
|
||||
FUNCTION_TYPE_LAST_ROWT, //TODO: removed
|
||||
FUNCTION_TYPE_MAX,
|
||||
FUNCTION_TYPE_MIN,
|
||||
FUNCTION_TYPE_MODE,
|
||||
|
|
|
@ -117,6 +117,9 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
|||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes);
|
||||
|
||||
int32_t lastRowFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
|
|
|
@ -1953,16 +1953,13 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
},
|
||||
{
|
||||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.type = FUNCTION_TYPE_LAST_ROWT,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.finalizeFunc = firstLastFinalize,
|
||||
.pPartialFunc = "_last_partial",
|
||||
.pMergeFunc = "_last_merge",
|
||||
.combineFunc = lastCombine,
|
||||
.processFunc = lastRowFunction,
|
||||
.finalizeFunc = lastRowFinalize,
|
||||
},
|
||||
{
|
||||
.name = "_cache_last_row",
|
||||
|
|
|
@ -80,6 +80,7 @@ typedef struct STopBotRes {
|
|||
|
||||
typedef struct SFirstLastRes {
|
||||
bool hasResult;
|
||||
bool isNull; //used for last_row function only
|
||||
int32_t bytes;
|
||||
char buf[];
|
||||
} SFirstLastRes;
|
||||
|
@ -2763,6 +2764,113 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
int32_t type = pInputCol->info.type;
|
||||
int32_t bytes = pInputCol->info.bytes;
|
||||
pInfo->bytes = bytes;
|
||||
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
|
||||
|
||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||
|
||||
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
|
||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
pInfo->isNull = true;
|
||||
} else {
|
||||
pInfo->isNull = false;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
bytes = varDataTLen(data);
|
||||
pInfo->bytes = bytes;
|
||||
}
|
||||
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
|
||||
}
|
||||
*(TSKEY*)(pInfo->buf) = cts;
|
||||
numOfElems++;
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else { // descending order
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
pInfo->isNull = true;
|
||||
} else {
|
||||
pInfo->isNull = false;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
bytes = varDataTLen(data);
|
||||
pInfo->bytes = bytes;
|
||||
}
|
||||
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
|
||||
}
|
||||
*(TSKEY*)(pInfo->buf) = cts;
|
||||
numOfElems++;
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
pResInfo->numOfRes = 1;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull);
|
||||
//handle selectivity
|
||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
|
|
@ -313,9 +313,11 @@ class TDTestCase:
|
|||
tdSql.checkRows(1)
|
||||
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.error("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
# stable
|
||||
|
@ -340,8 +342,10 @@ class TDTestCase:
|
|||
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.error("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
# regular table with compute functions
|
||||
|
||||
|
|
|
@ -315,9 +315,11 @@ class TDTestCase:
|
|||
tdSql.checkRows(1)
|
||||
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.error("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
# stable
|
||||
|
@ -342,8 +344,10 @@ class TDTestCase:
|
|||
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.error("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
# regular table with compute functions
|
||||
|
||||
|
|
|
@ -312,9 +312,11 @@ class TDTestCase:
|
|||
tdSql.checkRows(1)
|
||||
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.error("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select udf1(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.query("select round(num1) , last_row(num1) from tb;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
# stable
|
||||
|
@ -339,8 +341,10 @@ class TDTestCase:
|
|||
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.error("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.error("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
# regular table with compute functions
|
||||
|
||||
|
|
Loading…
Reference in New Issue