Merge pull request #13973 from taosdata/szhou/feature/project-elimation
feature: eliminate projection optimization
This commit is contained in:
commit
e05a6ca7fa
|
@ -114,7 +114,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||
|
||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
|
||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
||||
|
||||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||
|
|
|
@ -75,12 +75,18 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
|||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||
// recorded in the first segment, next to the struct header
|
||||
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pHandle->pSchema->pSlots) {
|
||||
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
|
||||
if (pSlotDesc->output) {
|
||||
++numOfCols;
|
||||
}
|
||||
}
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
|
||||
pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
|
||||
pEntry->numOfRows = pInput->pData->info.rows;
|
||||
pEntry->numOfCols = pInput->pData->info.numOfCols;
|
||||
pEntry->numOfCols = numOfCols;
|
||||
pEntry->dataLen = 0;
|
||||
|
||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||
|
|
|
@ -235,7 +235,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
terrno = code;
|
||||
return code;
|
||||
} else {
|
||||
qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid);
|
||||
qDebug("success to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid);
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||
|
@ -319,7 +319,14 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
continue;
|
||||
}
|
||||
|
||||
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
|
||||
SColMatchInfo* info = NULL;
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
|
||||
info = taosArrayGet(pList, j);
|
||||
if (info->targetSlotId == pNode->slotId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pNode->output) {
|
||||
(*numOfOutputCols) += 1;
|
||||
} else {
|
||||
|
@ -578,14 +585,15 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
}
|
||||
|
||||
// NOTE: sources columns are more than the destination SSDatablock columns.
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) {
|
||||
// doFilter in table scan needs every column even its output is false
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
|
||||
size_t numOfSrcCols = taosArrayGetSize(pCols);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
|
||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j);
|
||||
if (!pmInfo->output) {
|
||||
if (!outputEveryColumn && !pmInfo->output) {
|
||||
j++;
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -2086,7 +2086,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
|
|||
|
||||
// data from mnode
|
||||
pRes->info.rows = numOfRows;
|
||||
relocateColumnData(pRes, pColList, pBlock->pDataBlock);
|
||||
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
taosMemoryFree(pBlock);
|
||||
// blockDataDestroy(pBlock);
|
||||
|
|
|
@ -259,7 +259,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
return terrno;
|
||||
}
|
||||
|
||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
|
||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
||||
|
||||
// currently only the tbname pseudo column
|
||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||
|
@ -1505,7 +1505,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
p->info.rows = numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
@ -1597,7 +1597,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
|||
getPerfDbMeta(&pSysDbTableMeta, &size);
|
||||
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||
pInfo->pRes->info.rows = p->info.rows;
|
||||
blockDataDestroy(p);
|
||||
|
||||
|
@ -2079,7 +2079,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
return terrno;
|
||||
}
|
||||
|
||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
|
||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
||||
|
||||
// currently only the tbname pseudo column
|
||||
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
||||
|
|
|
@ -972,6 +972,11 @@ void releaseUdfFuncHandle(char* udfName) {
|
|||
}
|
||||
|
||||
int32_t cleanUpUdfs() {
|
||||
int8_t initialized = atomic_load_8(&gUdfdProxy.initialized);
|
||||
if (!initialized) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
|
||||
int32_t i = 0;
|
||||
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
|
||||
|
|
|
@ -1082,13 +1082,89 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
|
||||
//TODO: enable this optimization after new mechanising that map projection and targets of project node
|
||||
if (NULL != pNode->pParent) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode;
|
||||
if (-1 != pProjectNode->limit || -1 != pProjectNode->slimit || -1 != pProjectNode->offset || -1 != pProjectNode->soffset) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SHashObj* pProjColNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
SNode* pProjection;
|
||||
FOREACH(pProjection, pProjectNode->pProjections) {
|
||||
SExprNode* pExprNode = (SExprNode*)pProjection;
|
||||
if (QUERY_NODE_COLUMN != nodeType(pExprNode)) {
|
||||
taosHashCleanup(pProjColNameHash);
|
||||
return false;
|
||||
}
|
||||
|
||||
char* projColumnName = ((SColumnNode*)pProjection)->colName;
|
||||
int32_t* pExist = taosHashGet(pProjColNameHash, projColumnName, strlen(projColumnName));
|
||||
if (NULL != pExist) {
|
||||
taosHashCleanup(pProjColNameHash);
|
||||
return false;
|
||||
} else {
|
||||
int32_t exist = 1;
|
||||
taosHashPut(pProjColNameHash, projColumnName, strlen(projColumnName), &exist, sizeof(exist));
|
||||
}
|
||||
}
|
||||
taosHashCleanup(pProjColNameHash);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SProjectLogicNode* pProjectNode) {
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0);
|
||||
SNodeList* pNewChildTargets = nodesMakeList();
|
||||
|
||||
SNode* pProjection = NULL;
|
||||
FOREACH(pProjection, pProjectNode->pProjections) {
|
||||
SNode* pChildTarget = NULL;
|
||||
FOREACH(pChildTarget, pChild->pTargets) {
|
||||
if (strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName) == 0) {
|
||||
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
nodesDestroyList(pChild->pTargets);
|
||||
pChild->pTargets = pNewChildTargets;
|
||||
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
||||
nodesDestroyNode((SNode*)pProjectNode);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t eliminateProjOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SProjectLogicNode* pProjectNode =
|
||||
(SProjectLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, eliminateProjOptMayBeOptimized);
|
||||
|
||||
if (NULL == pProjectNode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return eliminateProjOptimizeImpl(pCxt, pLogicSubplan, pProjectNode);
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
static const SOptimizeRule optimizeRuleSet[] = {
|
||||
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
|
||||
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
||||
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
||||
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
|
||||
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
|
||||
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize},
|
||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList) {
|
|||
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
|
||||
if (NULL == pOld->pParent) {
|
||||
pSubplan->pNode = (SLogicNode*)pNew;
|
||||
pNew->pParent = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,3 +52,13 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
|||
|
||||
run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTARTTS DESC");
|
||||
}
|
||||
|
||||
TEST_F(PlanOptimizeTest, eliminateProjection) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT c1, sum(c3) FROM t1 GROUP BY c1");
|
||||
run("SELECT c1 FROM t1");
|
||||
run("SELECT * FROM st1");
|
||||
run("SELECT c1 FROM st1s3");
|
||||
//run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
|
||||
}
|
||||
|
|
|
@ -533,7 +533,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
|||
atomic_store_8(&ctx->taskType, taskType);
|
||||
atomic_store_8(&ctx->explain, explain);
|
||||
|
||||
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
|
||||
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
|
||||
|
||||
code = qStringToSubplan(qwMsg->msg, &plan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
Loading…
Reference in New Issue