parent
f0f52c6ecd
commit
db48f118a1
|
@ -143,6 +143,7 @@ typedef struct SqlFunctionCtx {
|
||||||
struct SExprInfo *pExpr;
|
struct SExprInfo *pExpr;
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
struct SSDataBlock *pSrcBlock;
|
struct SSDataBlock *pSrcBlock;
|
||||||
|
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
|
||||||
int32_t curBufPage;
|
int32_t curBufPage;
|
||||||
bool increase;
|
bool increase;
|
||||||
|
|
||||||
|
|
|
@ -651,6 +651,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// link pDstBlock to set selectivity value
|
||||||
|
if (pfCtx->subsidiaries.num > 0) {
|
||||||
|
pfCtx->pDstBlock = pResult;
|
||||||
|
}
|
||||||
|
|
||||||
numOfRows = pfCtx->fpSet.process(pfCtx);
|
numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||||
// _group_key function for "partition by tbname" + csum(col_name) query
|
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||||
|
|
|
@ -2425,7 +2425,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "diff",
|
.name = "diff",
|
||||||
.type = FUNCTION_TYPE_DIFF,
|
.type = FUNCTION_TYPE_DIFF,
|
||||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||||
.translateFunc = translateDiff,
|
.translateFunc = translateDiff,
|
||||||
.getEnvFunc = getDiffFuncEnv,
|
.getEnvFunc = getDiffFuncEnv,
|
||||||
.initFunc = diffFunctionSetup,
|
.initFunc = diffFunctionSetup,
|
||||||
|
|
|
@ -1624,6 +1624,10 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
|
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
|
if (pCtx->subsidiaries.num <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
@ -1655,8 +1659,6 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
||||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
int32_t ps = 0;
|
|
||||||
|
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||||
if (nullList[j]) {
|
if (nullList[j]) {
|
||||||
|
@ -1678,6 +1680,39 @@ void releaseSource(STuplePos* pPos) {
|
||||||
// Todo(liuyao) relase row
|
// Todo(liuyao) relase row
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function append the selectivity to subsidiaries function context directly, without fetching data
|
||||||
|
// from intermediate disk based buf page
|
||||||
|
void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex) {
|
||||||
|
if (pCtx->subsidiaries.num <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
|
||||||
|
// get data from source col
|
||||||
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, srcSlotId);
|
||||||
|
|
||||||
|
char* pData = colDataGetData(pSrcCol, rowIndex);
|
||||||
|
|
||||||
|
// append to dest col
|
||||||
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId);
|
||||||
|
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrcCol, rowIndex) == true) {
|
||||||
|
colDataAppendNULL(pDstCol, rowIndex);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pDstCol, rowIndex, pData, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||||
releaseSource(pDestPos);
|
releaseSource(pDestPos);
|
||||||
*pDestPos = *pSourcePos;
|
*pDestPos = *pSourcePos;
|
||||||
|
@ -3154,6 +3189,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
|
||||||
colDataAppendInt64(pOutput, pos, &delta);
|
colDataAppendInt64(pOutput, pos, &delta);
|
||||||
}
|
}
|
||||||
pDiffInfo->prev.i64 = v;
|
pDiffInfo->prev.i64 = v;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
@ -3247,6 +3283,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
if (pDiffInfo->hasPrev) {
|
if (pDiffInfo->hasPrev) {
|
||||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||||
|
// handle selectivity
|
||||||
|
appendSelectivityValue(pCtx, pos);
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3273,6 +3311,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||||
// there is a row of previous data block to be handled in the first place.
|
// there is a row of previous data block to be handled in the first place.
|
||||||
if (pDiffInfo->hasPrev) {
|
if (pDiffInfo->hasPrev) {
|
||||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||||
|
// handle selectivity
|
||||||
|
appendSelectivityValue(pCtx, pos);
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue