[td-13039] add first/last query.
This commit is contained in:
parent
350c3406dd
commit
309076de1f
|
@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData {
|
|||
typedef struct SqlFunctionCtx {
|
||||
SInputColumnInfoData input;
|
||||
SResultDataInfo resDataInfo;
|
||||
uint32_t order; // asc|desc
|
||||
uint32_t order; // data block scanner order: asc|desc
|
||||
////////////////////////////////////////////////////////////////
|
||||
int32_t startRow; // start row index
|
||||
int32_t size; // handled processed row number
|
||||
|
|
|
@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
|||
void minFunction(SqlFunctionCtx* pCtx);
|
||||
void maxFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
void firstFunction(SqlFunctionCtx *pCtx);
|
||||
void lastFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = maxFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "first",
|
||||
.type = FUNCTION_TYPE_FIRST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = firstFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "last",
|
||||
.type = FUNCTION_TYPE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "concat",
|
||||
.type = FUNCTION_TYPE_CONCAT,
|
||||
|
@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
|||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
|
||||
break;
|
||||
}
|
||||
case FUNCTION_TYPE_FIRST:
|
||||
case FUNCTION_TYPE_LAST:
|
||||
case FUNCTION_TYPE_MIN:
|
||||
case FUNCTION_TYPE_MAX: {
|
||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
|
|
|
@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) {
|
|||
int32_t numOfElem = 0;
|
||||
|
||||
/*
|
||||
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
|
||||
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
|
||||
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
|
||||
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
|
||||
* 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
|
||||
* 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
|
||||
*/
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
|
||||
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) {
|
|||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
}
|
||||
|
||||
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSumRes);
|
||||
return true;
|
||||
}
|
||||
|
@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(int64_t);
|
||||
return true;
|
||||
}
|
||||
|
@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t* d = (_t*)((_col)->pData); \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
|
@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) {
|
|||
void maxFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
}
|
||||
}
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO fix this
|
||||
// This ordinary first function only handle the data block in ascending order
|
||||
void firstFunction(SqlFunctionCtx *pCtx) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
||||
ASSERT(pInputCol->hasNull == true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for the first not null data
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
memcpy(buf, data, pInputCol->info.bytes);
|
||||
// TODO handle the subsidary value
|
||||
// if (pCtx->ptsList != NULL) {
|
||||
// TSKEY k = GET_TS_DATA(pCtx, i);
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
|
||||
// }
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true;
|
||||
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
}
|
||||
|
||||
void lastFunction(SqlFunctionCtx *pCtx) {
|
||||
if (pCtx->order != TSDB_ORDER_DESC) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
||||
ASSERT(pInputCol->hasNull == true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
memcpy(buf, data, pInputCol->info.bytes);
|
||||
|
||||
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true; // set query completed on this column
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
} else { // ascending order
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
memcpy(buf, data, pCtx->inputBytes);
|
||||
|
||||
*(TSKEY*)buf = ts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue