Merge pull request #15295 from taosdata/feature/3.0_debug_wxy

fix: plan problem when functions that requires a timeline is used directly in a super table
This commit is contained in:
Xiaoyu Wang 2022-07-22 18:50:27 +08:00 committed by GitHub
commit b97c440fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 77 additions and 52 deletions

View File

@ -111,6 +111,7 @@ typedef struct SAggLogicNode {
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
bool hasLastRow;
bool hasTimeLineFunc;
} SAggLogicNode;
typedef struct SProjectLogicNode {

View File

@ -521,7 +521,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
// NOTE: the last parameter is the primary timestamp column
// todo: refactor this
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
}
@ -4440,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
if (pHandle->vnode) {

View File

@ -2771,6 +2771,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableEndIdx = pInfo->tableEndIndex;
STableListInfo* tableListInfo = pInfo->tableListInfo;
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, GET_TASKID(pTaskInfo));
@ -2943,7 +2944,13 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
const STableKeyInfo* info1 = p1;
const STableKeyInfo* info2 = p2;
return info1->groupId - info2->groupId;
if (info1->groupId < info2->groupId) {
return -1;
} else if (info1->groupId > info2->groupId) {
return 1;
} else {
return 0;
}
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
@ -2985,7 +2992,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pColMatchInfo = pColList;
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order);

View File

@ -2242,7 +2242,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2253,7 +2253,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2263,7 +2263,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "first",
.type = FUNCTION_TYPE_FIRST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2277,7 +2277,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_first_partial",
.type = FUNCTION_TYPE_FIRST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2288,7 +2288,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_first_merge",
.type = FUNCTION_TYPE_FIRST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2299,7 +2299,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "last",
.type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2313,7 +2313,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_last_partial",
.type = FUNCTION_TYPE_LAST_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLastPartial,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
@ -2324,7 +2324,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_last_merge",
.type = FUNCTION_TYPE_LAST_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLastMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,

View File

@ -2465,8 +2465,8 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult
}
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
@ -2502,7 +2502,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
}
qDebug("add %d elements into histogram, total:%d, numOfEntry:%d, %p", numOfElems, pInfo->pHisto->numOfElems,
pInfo->pHisto->numOfEntries, pInfo->pHisto);
pInfo->pHisto->numOfEntries, pInfo->pHisto);
}
SET_VAL(pResInfo, numOfElems, 1);
@ -2542,18 +2542,17 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("merge histo, total:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries, pHisto);
qDebug("merge histo, total:%" PRId64 ", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries, pHisto);
} else {
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("input histogram, elem:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries,
pInput->pHisto);
qDebug("input histogram, elem:%" PRId64 ", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries,
pInput->pHisto);
SHistogramInfo* pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo));
qDebug("merge histo, total:%"PRId64", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries,
pHisto);
qDebug("merge histo, total:%" PRId64 ", entry:%d, %p", pHisto->numOfElems, pHisto->numOfEntries, pHisto);
tHistogramDestroy(&pRes);
}
}
@ -2580,7 +2579,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
}
if (pInfo->algo != APERCT_ALGO_TDIGEST) {
qDebug("after merge, total:%d, numOfEntry:%d, %p", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto);
qDebug("after merge, total:%d, numOfEntry:%d, %p", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries,
pInfo->pHisto);
}
SET_VAL(pResInfo, 1, 1);
@ -2602,7 +2602,8 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} else {
buildHistogramInfo(pInfo);
if (pInfo->pHisto->numOfElems > 0) {
qDebug("get the final res:%d, elements:%"PRId64", entry:%d", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries);
qDebug("get the final res:%d, elements:%" PRId64 ", entry:%d", pInfo->pHisto->numOfElems,
pInfo->pHisto->numOfEntries);
double ratio[] = {pInfo->percent};
double* res = tHistogramUniform(pInfo->pHisto, ratio, 1);
@ -4665,10 +4666,8 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t numOfElems = 0;
@ -4704,11 +4703,6 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
}
}
// TODO: remove this after pTsOutput is handled
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems++;
}
@ -5205,8 +5199,8 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->isNull = false;
pInfo->p.key = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
pInfo->p.key = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
return true;
}

View File

@ -1237,7 +1237,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasLastRowFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
}
}

View File

@ -478,8 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
}
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
@ -928,7 +929,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions
SNodeList* pGroupKeys = NULL;
SNode* pProjection = NULL;
SNode* pProjection = NULL;
FOREACH(pProjection, pSelect->pProjectionList) {
code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pProjection));
if (TSDB_CODE_SUCCESS != code) {

View File

@ -904,14 +904,6 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
if (NULL != pScan->pGroupTags) {
return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
}
return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
}
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanCols) {
@ -922,11 +914,12 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
return NULL;
}
static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
bool groupSort) {
SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false);
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
@ -937,12 +930,24 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true);
}
if (NULL != pScan->pGroupTags) {
return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
}
return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
}
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
FOREACH(pChild, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild);
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
} else {

View File

@ -124,7 +124,7 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
}
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) {
if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) {
return TSDB_CODE_SUCCESS;
}
// The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK
@ -161,7 +161,9 @@ static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel req
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pAgg->node.resultDataOrder = requirement;
pAgg->node.requireDataOrder = requirement;
if (pAgg->hasTimeLineFunc) {
pAgg->node.requireDataOrder = requirement < DATA_ORDER_LEVEL_IN_GROUP ? DATA_ORDER_LEVEL_IN_GROUP : requirement;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -139,6 +139,10 @@ TEST_F(PlanBasicTest, timeLineFunc) {
run("SELECT CSUM(c1) FROM t1");
run("SELECT CSUM(c1) FROM st1");
run("SELECT TWA(c1) FROM t1");
run("SELECT TWA(c1) FROM st1");
}
TEST_F(PlanBasicTest, multiResFunc) {

View File

@ -58,7 +58,19 @@ TEST_F(PlanPartitionByTest, withInterval) {
TEST_F(PlanPartitionByTest, withGroupBy) {
useDb("root", "test");
run("select count(*) from t1 partition by c1 group by c2");
run("SELECT COUNT(*) FROM t1 PARTITION BY c1 GROUP BY c2");
run("SELECT TBNAME, c1 FROM st1 PARTITION BY TBNAME GROUP BY c1");
}
TEST_F(PlanPartitionByTest, withTimeLineFunc) {
useDb("root", "test");
run("SELECT TWA(c1) FROM st1 PARTITION BY c1");
}
TEST_F(PlanPartitionByTest, withSlimit) {
useDb("root", "test");
run("SELECT CSUM(c1) FROM st1 PARTITION BY TBNAME SLIMIT 1");
}

View File

@ -28,7 +28,7 @@ from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql)
def csum_query_form(self, col="c1", alias="", table_expr="t1", condition=""):