Merge pull request #20964 from taosdata/enh/3.0_planner_optimize
feat: last queries with tags output can be read from cache
This commit is contained in:
commit
7cf9919ab0
|
@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
p->ts = pCol->ts;
|
p->ts = pCol->ts;
|
||||||
p->colVal = pCol->colVal;
|
p->colVal = pCol->colVal;
|
||||||
singleTableLastTs = pCol->ts;
|
singleTableLastTs = pCol->ts;
|
||||||
|
|
||||||
// only set value for last row query
|
|
||||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
|
||||||
if (taosArrayGetSize(pTableUidList) == 0) {
|
|
||||||
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
|
||||||
} else {
|
|
||||||
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SLastCol* p = taosArrayGet(pLastCols, slotId);
|
SLastCol* p = taosArrayGet(pLastCols, slotId);
|
||||||
|
@ -417,6 +408,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pTableUidList) == 0) {
|
||||||
|
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
||||||
|
} else {
|
||||||
|
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbCacheRelease(lruCache, h);
|
tsdbCacheRelease(lruCache, h);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param);
|
||||||
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
||||||
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
|
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
|
||||||
|
|
||||||
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
||||||
|
|
||||||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
|
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tableListDestroy(pTableListInfo);
|
tableListDestroy(pTableListInfo);
|
||||||
|
@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
|
|
||||||
uint64_t suid = tableListGetSuid(pTableListInfo);
|
uint64_t suid = tableListGetSuid(pTableListInfo);
|
||||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
|
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
|
||||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
|
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
||||||
|
pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
|
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
|
@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
destroyCacheScanOperator(pInfo);
|
destroyCacheScanOperator(pInfo);
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
|
@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCacheRowsScanInfo* pInfo = pOperator->info;
|
SCacheRowsScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableListInfo* pTableList = pInfo->pTableList;
|
STableListInfo* pTableList = pInfo->pTableList;
|
||||||
|
|
||||||
uint64_t suid = tableListGetSuid(pTableList);
|
uint64_t suid = tableListGetSuid(pTableList);
|
||||||
int32_t size = tableListGetSize(pTableList);
|
int32_t size = tableListGetSize(pTableList);
|
||||||
|
@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
pRes->info.rows = 1;
|
pRes->info.rows = 1;
|
||||||
|
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
||||||
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pList = NULL;
|
STableKeyInfo* pList = NULL;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
|
||||||
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
|
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
|
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||||
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
|
||||||
|
|
||||||
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
|
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||||
GET_TASKID(pTaskInfo), NULL);
|
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
||||||
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
|
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
|
|
@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
|
||||||
*pNotOptimize = false;
|
*pNotOptimize = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (nodeType(pNode)) {
|
switch (nodeType(pNode)) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_SCAN: {
|
case QUERY_NODE_LOGIC_PLAN_SCAN: {
|
||||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||||
|
@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt {
|
||||||
bool hasCol;
|
bool hasCol;
|
||||||
} SLastRowScanOptLastParaCkCxt;
|
} SLastRowScanOptLastParaCkCxt;
|
||||||
|
|
||||||
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
|
static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
|
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
|
||||||
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
|
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
|
||||||
|
@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool lastRowScanOptLastParaCheck(SNode* pExpr) {
|
static bool lastRowScanOptLastParaIsTag(SNode* pExpr) {
|
||||||
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
|
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
|
||||||
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt);
|
nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt);
|
||||||
return !cxt.hasTag && cxt.hasCol;
|
return cxt.hasTag && !cxt.hasCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
|
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
|
||||||
|
@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
|
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
|
||||||
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
|
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
|
||||||
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
|
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
|
||||||
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
if (hasSelectFunc || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
hasLastFunc = true;
|
hasLastFunc = true;
|
||||||
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
|
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
|
||||||
if (hasLastFunc) {
|
if (hasLastFunc) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
hasSelectFunc = true;
|
hasSelectFunc = true;
|
||||||
|
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
|
||||||
|
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
|
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2237,7 +2241,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols) {
|
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) {
|
||||||
SNode* pTarget = NULL;
|
SNode* pTarget = NULL;
|
||||||
WHERE_EACH(pTarget, pTargets) {
|
WHERE_EACH(pTarget, pTargets) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
@ -2249,7 +2253,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!found) {
|
if (!found && erase) {
|
||||||
ERASE_NODE(pTargets);
|
ERASE_NODE(pTargets);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2290,9 +2294,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
pScan->igLastNull = pAgg->hasLast ? true : false;
|
pScan->igLastNull = pAgg->hasLast ? true : false;
|
||||||
if (NULL != cxt.pLastCols) {
|
if (NULL != cxt.pLastCols) {
|
||||||
cxt.doAgg = false;
|
cxt.doAgg = false;
|
||||||
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols);
|
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
|
||||||
NODES_DESTORY_LIST(pScan->pScanPseudoCols);
|
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
|
||||||
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols);
|
|
||||||
nodesClearList(cxt.pLastCols);
|
nodesClearList(cxt.pLastCols);
|
||||||
}
|
}
|
||||||
pAgg->hasLastRow = false;
|
pAgg->hasLastRow = false;
|
||||||
|
|
Loading…
Reference in New Issue