Merge pull request #14323 from taosdata/enh/first_last_select
enh(query): first/last function support selectivity
This commit is contained in:
commit
4524b826df
|
@ -1904,7 +1904,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_first_partial",
|
||||
.type = FUNCTION_TYPE_FIRST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -1915,7 +1915,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_first_merge",
|
||||
.type = FUNCTION_TYPE_FIRST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -1939,7 +1939,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_last_partial",
|
||||
.type = FUNCTION_TYPE_LAST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -1950,7 +1950,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_last_merge",
|
||||
.type = FUNCTION_TYPE_LAST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -2409,11 +2409,11 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); }
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t) + sizeof(STuplePos); }
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t);
|
||||
pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2491,9 +2491,17 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->numOfRes = 1;
|
||||
break;
|
||||
}
|
||||
|
@ -2525,6 +2533,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
|
@ -2580,8 +2597,17 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
}
|
||||
break;
|
||||
|
@ -2603,6 +2629,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||
//handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
pResInfo->numOfRes = 1;
|
||||
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
@ -2615,7 +2650,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
|
||||
SInputColumnInfoData* pColInfo = &pCtx->input;
|
||||
int32_t start = pColInfo->startRowIndex;
|
||||
|
||||
pOutput->bytes = pInput->bytes;
|
||||
TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
|
||||
TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
|
||||
|
@ -2632,7 +2670,17 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput,
|
|||
}
|
||||
*tsOut = *tsIn;
|
||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||
//handle selectivity
|
||||
STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY));
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
if (!pOutput->hasResult) {
|
||||
saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
pOutput->hasResult = true;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2647,7 +2695,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
char* data = colDataGetData(pCol, start);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
|
||||
firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery);
|
||||
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery);
|
||||
|
||||
int32_t numOfElems = pInputInfo->hasResult ? 1 : 0;
|
||||
|
||||
|
@ -2669,6 +2717,9 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
|
||||
//handle selectivity
|
||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
@ -2687,6 +2738,9 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
//handle selectivity
|
||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return 1;
|
||||
|
@ -3043,7 +3097,9 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
pItem->uid = uid;
|
||||
|
||||
// save the data of this tuple
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
}
|
||||
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
|
@ -3062,7 +3118,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
pItem->uid = uid;
|
||||
|
||||
// save the data of this tuple by over writing the old data
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
}
|
||||
|
||||
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
|
||||
topBotResComparFn, NULL, !isTopQuery);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue