Merge pull request #15390 from taosdata/enh/TD-17659
enh(query): add selectivity for output n-1 indifinite_rows function(diff & derivative)
This commit is contained in:
commit
c442641270
|
@ -143,6 +143,7 @@ typedef struct SqlFunctionCtx {
|
|||
struct SExprInfo *pExpr;
|
||||
struct SDiskbasedBuf *pBuf;
|
||||
struct SSDataBlock *pSrcBlock;
|
||||
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
|
||||
int32_t curBufPage;
|
||||
bool increase;
|
||||
|
||||
|
|
|
@ -666,6 +666,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
||||
}
|
||||
|
||||
// link pDstBlock to set selectivity value
|
||||
if (pfCtx->subsidiaries.num > 0) {
|
||||
pfCtx->pDstBlock = pResult;
|
||||
}
|
||||
|
||||
numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||
|
|
|
@ -2231,7 +2231,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "derivative",
|
||||
.type = FUNCTION_TYPE_DERIVATIVE,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateDerivative,
|
||||
.getEnvFunc = getDerivativeFuncEnv,
|
||||
.initFunc = derivativeFuncSetup,
|
||||
|
@ -2436,7 +2436,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
|
|
|
@ -1624,6 +1624,10 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
@ -1655,8 +1659,6 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
int32_t ps = 0;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
if (nullList[j]) {
|
||||
|
@ -1678,6 +1680,39 @@ void releaseSource(STuplePos* pPos) {
|
|||
// Todo(liuyao) relase row
|
||||
}
|
||||
|
||||
// This function append the selectivity to subsidiaries function context directly, without fetching data
|
||||
// from intermediate disk based buf page
|
||||
void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
// get data from source col
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
char* pData = colDataGetData(pSrcCol, rowIndex);
|
||||
|
||||
// append to dest col
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
|
||||
if (colDataIsNull_s(pSrcCol, rowIndex) == true) {
|
||||
colDataAppendNULL(pDstCol, pos);
|
||||
} else {
|
||||
colDataAppend(pDstCol, pos, pData, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||
releaseSource(pDestPos);
|
||||
*pDestPos = *pSourcePos;
|
||||
|
@ -3154,6 +3189,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
|
|||
colDataAppendInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
|
@ -3247,6 +3283,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
if (pDiffInfo->hasPrev) {
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
} else {
|
||||
|
@ -3273,6 +3313,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
// there is a row of previous data block to be handled in the first place.
|
||||
if (pDiffInfo->hasPrev) {
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
} else {
|
||||
|
@ -5723,6 +5767,12 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
}
|
||||
}
|
||||
|
@ -5755,6 +5805,12 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &pDerivInfo->prevTs);
|
||||
}
|
||||
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,9 +144,9 @@ TEST_F(ParserSelectTest, IndefiniteRowsFunc) {
|
|||
TEST_F(ParserSelectTest, IndefiniteRowsFuncSemanticCheck) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT DIFF(c1), c2 FROM t1", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
|
||||
run("SELECT DIFF(c1), c2 FROM t1");
|
||||
|
||||
run("SELECT DIFF(c1), tbname FROM t1", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
|
||||
run("SELECT DIFF(c1), tbname FROM t1");
|
||||
|
||||
run("SELECT DIFF(c1), count(*) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
|
||||
|
|
|
@ -95,7 +95,6 @@ class TDTestCase:
|
|||
tdSql.error("select diff(col12) from stb_1")
|
||||
tdSql.error("select diff(col13) from stb_1")
|
||||
tdSql.error("select diff(col14) from stb_1")
|
||||
tdSql.error("select ts,diff(col1),ts from stb_1")
|
||||
|
||||
tdSql.query("select diff(col1) from stb_1")
|
||||
tdSql.checkRows(10)
|
||||
|
@ -115,6 +114,79 @@ class TDTestCase:
|
|||
tdSql.query("select diff(col6) from stb_1")
|
||||
tdSql.checkRows(10)
|
||||
|
||||
# check selectivity
|
||||
tdSql.query("select ts, diff(col1), col2 from stb_1")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||
tdSql.checkData(2, 0, "2018-09-17 09:00:00.002")
|
||||
tdSql.checkData(3, 0, "2018-09-17 09:00:00.003")
|
||||
tdSql.checkData(4, 0, "2018-09-17 09:00:00.004")
|
||||
tdSql.checkData(5, 0, "2018-09-17 09:00:00.005")
|
||||
tdSql.checkData(6, 0, "2018-09-17 09:00:00.006")
|
||||
tdSql.checkData(7, 0, "2018-09-17 09:00:00.007")
|
||||
tdSql.checkData(8, 0, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(4, 1, 1)
|
||||
tdSql.checkData(5, 1, 1)
|
||||
tdSql.checkData(6, 1, 1)
|
||||
tdSql.checkData(7, 1, 1)
|
||||
tdSql.checkData(8, 1, 1)
|
||||
tdSql.checkData(9, 1, 1)
|
||||
|
||||
tdSql.checkData(0, 2, 0)
|
||||
tdSql.checkData(1, 2, 1)
|
||||
tdSql.checkData(2, 2, 2)
|
||||
tdSql.checkData(3, 2, 3)
|
||||
tdSql.checkData(4, 2, 4)
|
||||
tdSql.checkData(5, 2, 5)
|
||||
tdSql.checkData(6, 2, 6)
|
||||
tdSql.checkData(7, 2, 7)
|
||||
tdSql.checkData(8, 2, 8)
|
||||
tdSql.checkData(9, 2, 9)
|
||||
|
||||
tdSql.query("select ts, diff(col1), col2 from stb order by ts")
|
||||
tdSql.checkRows(10)
|
||||
|
||||
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
|
||||
tdSql.checkData(1, 0, "2018-09-17 09:00:00.001")
|
||||
tdSql.checkData(2, 0, "2018-09-17 09:00:00.002")
|
||||
tdSql.checkData(3, 0, "2018-09-17 09:00:00.003")
|
||||
tdSql.checkData(4, 0, "2018-09-17 09:00:00.004")
|
||||
tdSql.checkData(5, 0, "2018-09-17 09:00:00.005")
|
||||
tdSql.checkData(6, 0, "2018-09-17 09:00:00.006")
|
||||
tdSql.checkData(7, 0, "2018-09-17 09:00:00.007")
|
||||
tdSql.checkData(8, 0, "2018-09-17 09:00:00.008")
|
||||
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
|
||||
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(4, 1, 1)
|
||||
tdSql.checkData(5, 1, 1)
|
||||
tdSql.checkData(6, 1, 1)
|
||||
tdSql.checkData(7, 1, 1)
|
||||
tdSql.checkData(8, 1, 1)
|
||||
tdSql.checkData(9, 1, 1)
|
||||
|
||||
tdSql.checkData(0, 2, 0)
|
||||
tdSql.checkData(1, 2, 1)
|
||||
tdSql.checkData(2, 2, 2)
|
||||
tdSql.checkData(3, 2, 3)
|
||||
tdSql.checkData(4, 2, 4)
|
||||
tdSql.checkData(5, 2, 5)
|
||||
tdSql.checkData(6, 2, 6)
|
||||
tdSql.checkData(7, 2, 7)
|
||||
tdSql.checkData(8, 2, 8)
|
||||
tdSql.checkData(9, 2, 9)
|
||||
|
||||
|
||||
tdSql.execute('''create table stb1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
|
||||
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
|
||||
tdSql.execute("create table stb1_1 using stb tags('shanghai')")
|
||||
|
|
|
@ -283,14 +283,14 @@ class TDTestCase:
|
|||
tdSql.error(self.diff_query_form(alias=", diff(c1)")) # mix with calculation function 2
|
||||
# tdSql.error(self.diff_query_form(alias=" + 2")) # mix with arithmetic 1
|
||||
tdSql.error(self.diff_query_form(alias=" + avg(c1)")) # mix with arithmetic 2
|
||||
tdSql.error(self.diff_query_form(alias=", c2")) # mix with other 1
|
||||
tdSql.query(self.diff_query_form(alias=", c2")) # mix with other 1
|
||||
# tdSql.error(self.diff_query_form(table_expr="stb1")) # select stb directly
|
||||
stb_join = {
|
||||
"col": "stb1.c1",
|
||||
"table_expr": "stb1, stb2",
|
||||
"condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts"
|
||||
}
|
||||
tdSql.error(self.diff_query_form(**stb_join)) # stb join
|
||||
tdSql.query(self.diff_query_form(**stb_join)) # stb join
|
||||
interval_sql = {
|
||||
"condition": "where ts>0 and ts < now interval(1h) fill(next)"
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue