fix: split scan columns from cache last scan
This commit is contained in:
parent
38196f37b9
commit
6352b28b4c
|
@ -59,7 +59,7 @@ extern "C" {
|
||||||
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
||||||
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
||||||
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
|
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
|
||||||
#define EXPLAIN_MERGE_FORMAT "SortMerge"
|
#define EXPLAIN_MERGE_FORMAT "Merge"
|
||||||
#define EXPLAIN_MERGE_KEYS_FORMAT "Merge Key: "
|
#define EXPLAIN_MERGE_KEYS_FORMAT "Merge Key: "
|
||||||
#define EXPLAIN_IGNORE_GROUPID_FORMAT "Ignore Group Id: %s"
|
#define EXPLAIN_IGNORE_GROUPID_FORMAT "Ignore Group Id: %s"
|
||||||
#define EXPLAIN_PARTITION_KETS_FORMAT "Partition Key: "
|
#define EXPLAIN_PARTITION_KETS_FORMAT "Partition Key: "
|
||||||
|
|
|
@ -325,6 +325,7 @@ SSDataBlock* doColsMerge(SOperatorInfo* pOperator) {
|
||||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
|
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
|
||||||
|
int32_t nullBlkNum = 0;
|
||||||
|
|
||||||
qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
|
qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
@ -333,11 +334,19 @@ SSDataBlock* doColsMerge(SOperatorInfo* pOperator) {
|
||||||
if (pBlock && pBlock->info.rows > 1) {
|
if (pBlock && pBlock->info.rows > 1) {
|
||||||
qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows);
|
qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows);
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
} else if (NULL == pBlock) {
|
||||||
|
nullBlkNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock);
|
copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
|
||||||
|
if (2 == nullBlkNum) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->binfo.pRes->info.rows = 1;
|
pInfo->binfo.pRes->info.rows = 1;
|
||||||
|
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
|
|
@ -2590,6 +2590,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
typedef struct SLastRowScanOptSetColDataTypeCxt {
|
typedef struct SLastRowScanOptSetColDataTypeCxt {
|
||||||
bool doAgg;
|
bool doAgg;
|
||||||
SNodeList* pLastCols;
|
SNodeList* pLastCols;
|
||||||
|
SNodeList* pOtherCols;
|
||||||
} SLastRowScanOptSetColDataTypeCxt;
|
} SLastRowScanOptSetColDataTypeCxt;
|
||||||
|
|
||||||
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||||
|
@ -2632,6 +2633,33 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2) {
|
||||||
|
SNode* pTarget = NULL;
|
||||||
|
WHERE_EACH(pTarget, pTargets) {
|
||||||
|
bool found = false;
|
||||||
|
SNode* pCol = NULL;
|
||||||
|
FOREACH(pCol, pList1) {
|
||||||
|
if (nodesEqualNode(pCol, pTarget)) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) {
|
||||||
|
FOREACH(pCol, pList2) {
|
||||||
|
if (nodesEqualNode(pCol, pTarget)) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) {
|
||||||
|
ERASE_NODE(pTargets);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
WHERE_NEXT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
|
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
|
||||||
|
|
||||||
|
@ -2639,7 +2667,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL};
|
SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL, .pOtherCols = NULL};
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
SColumnNode* pPKTsCol = NULL;
|
SColumnNode* pPKTsCol = NULL;
|
||||||
SColumnNode* pNonPKCol = NULL;
|
SColumnNode* pNonPKCol = NULL;
|
||||||
|
@ -2660,14 +2688,18 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
||||||
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
||||||
}
|
}
|
||||||
} else if (FUNCTION_TYPE_SELECT_VALUE == funcType) {
|
} else {
|
||||||
pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
nodesListMakeAppend(&cxt.pOtherCols, pNode);
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
|
||||||
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (FUNCTION_TYPE_SELECT_VALUE == funcType) {
|
||||||
pPKTsCol = pCol;
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
} else {
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
pNonPKCol = pCol;
|
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
pPKTsCol = pCol;
|
||||||
|
} else {
|
||||||
|
pNonPKCol = pCol;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2681,6 +2713,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
|
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
|
||||||
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
|
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
|
||||||
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
|
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
|
||||||
|
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols);
|
||||||
if (pPKTsCol && pScan->node.pTargets->length == 1) {
|
if (pPKTsCol && pScan->node.pTargets->length == 1) {
|
||||||
// when select last(ts),ts from ..., we add another ts to targets
|
// when select last(ts),ts from ..., we add another ts to targets
|
||||||
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
|
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
|
||||||
|
|
Loading…
Reference in New Issue