Merge pull request #29800 from taosdata/enh/TD-26215
enh: optimize group by tag performance
This commit is contained in:
commit
60245b4e46
|
@ -128,6 +128,7 @@ typedef struct SScanLogicNode {
|
||||||
bool paraTablesSort; // for table merge scan
|
bool paraTablesSort; // for table merge scan
|
||||||
bool smallDataTsSort; // disable row id sort for table merge scan
|
bool smallDataTsSort; // disable row id sort for table merge scan
|
||||||
bool needSplit;
|
bool needSplit;
|
||||||
|
bool noPseudoRefAfterGrp; // no pseudo columns referenced ater group/partition clause
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
|
|
@ -398,16 +398,16 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
// check for tag values
|
// check for tag values
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
|
||||||
|
|
||||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
|
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
|
||||||
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
|
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||||
void* pUid = taosArrayGet(pInfo->pUidList, 0);
|
void* pUid = taosArrayGet(pInfo->pUidList, 0);
|
||||||
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
|
||||||
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
|
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
|
||||||
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||||
pInfo->pRes->info.rows, pTaskInfo, NULL);
|
pInfo->pRes->info.rows, pTaskInfo, NULL);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
|
@ -510,6 +510,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(paraTablesSort);
|
COPY_SCALAR_FIELD(paraTablesSort);
|
||||||
COPY_SCALAR_FIELD(smallDataTsSort);
|
COPY_SCALAR_FIELD(smallDataTsSort);
|
||||||
COPY_SCALAR_FIELD(needSplit);
|
COPY_SCALAR_FIELD(needSplit);
|
||||||
|
COPY_SCALAR_FIELD(noPseudoRefAfterGrp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -406,6 +406,47 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
|
||||||
|
|
||||||
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
|
static bool needScanDefaultCol(EScanType scanType) { return SCAN_TYPE_TABLE_COUNT != scanType; }
|
||||||
|
|
||||||
|
static int32_t updateScanNoPseudoRefAfterGrp(SSelectStmt* pSelect, SScanLogicNode* pScan, SRealTableNode* pRealTable) {
|
||||||
|
if (NULL == pScan->pScanPseudoCols || pScan->pScanPseudoCols->length <= 0 || NULL != pSelect->pTags || NULL != pSelect->pSubtable) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList* pList = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
if (NULL == pSelect->pPartitionByList || pSelect->pPartitionByList->length <= 0) {
|
||||||
|
if (NULL == pSelect->pGroupByList || pSelect->pGroupByList->length <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = nodesCollectColumns(pSelect, SQL_CLAUSE_GROUP_BY, pRealTable->table.tableAlias, COLLECT_COL_TYPE_TAG,
|
||||||
|
&pList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, pRealTable->table.tableAlias, fmIsScanPseudoColumnFunc,
|
||||||
|
&pList);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && (NULL == pList || pList->length <= 0)) {
|
||||||
|
pScan->noPseudoRefAfterGrp = true;
|
||||||
|
}
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, pRealTable->table.tableAlias, COLLECT_COL_TYPE_TAG,
|
||||||
|
&pList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_PARTITION_BY, pRealTable->table.tableAlias, fmIsScanPseudoColumnFunc,
|
||||||
|
&pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && (NULL == pList || pList->length <= 0)) {
|
||||||
|
pScan->noPseudoRefAfterGrp = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
nodesDestroyList(pList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
|
static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable,
|
||||||
SLogicNode** pLogicNode) {
|
SLogicNode** pLogicNode) {
|
||||||
SScanLogicNode* pScan = NULL;
|
SScanLogicNode* pScan = NULL;
|
||||||
|
@ -437,7 +478,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
&pScan->pScanPseudoCols);
|
&pScan->pScanPseudoCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = updateScanNoPseudoRefAfterGrp(pSelect, pScan, pRealTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan);
|
||||||
|
}
|
||||||
|
|
||||||
// rewrite the expression in subsequent clauses
|
// rewrite the expression in subsequent clauses
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -93,6 +93,12 @@ typedef struct SCpdCollectTableColCxt {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
} SCpdCollectTableColCxt;
|
} SCpdCollectTableColCxt;
|
||||||
|
|
||||||
|
typedef struct SCollectColsCxt {
|
||||||
|
SHashObj* pColHash;
|
||||||
|
int32_t errCode;
|
||||||
|
} SCollectColsCxt;
|
||||||
|
|
||||||
|
|
||||||
typedef enum ECondAction {
|
typedef enum ECondAction {
|
||||||
COND_ACTION_STAY = 1,
|
COND_ACTION_STAY = 1,
|
||||||
COND_ACTION_PUSH_JOIN,
|
COND_ACTION_PUSH_JOIN,
|
||||||
|
@ -3302,6 +3308,70 @@ static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t st
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes partTagsCollectColsNodes(SNode* pNode, void* pContext) {
|
||||||
|
SCollectColsCxt* pCxt = pContext;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (NULL == taosHashGet(pCxt->pColHash, pCol->colName, strlen(pCol->colName))) {
|
||||||
|
pCxt->errCode = taosHashPut(pCxt->pColHash, pCol->colName, strlen(pCol->colName), NULL, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool partTagsIsScanPseudoColsInConds(SScanLogicNode* pScan) {
|
||||||
|
SCollectColsCxt cxt = {
|
||||||
|
.errCode = TSDB_CODE_SUCCESS,
|
||||||
|
.pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)};
|
||||||
|
if (NULL == cxt.pColHash) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesWalkExpr(pScan->node.pConditions, partTagsCollectColsNodes, &cxt);
|
||||||
|
if (cxt.errCode) {
|
||||||
|
taosHashCleanup(cxt.pColHash);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pScan->pScanPseudoCols) {
|
||||||
|
if (taosHashGet(cxt.pColHash, ((SExprNode*)pNode)->aliasName, strlen(((SExprNode*)pNode)->aliasName))) {
|
||||||
|
taosHashCleanup(cxt.pColHash);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(cxt.pColHash);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partTagsOptRemovePseudoCols(SScanLogicNode* pScan) {
|
||||||
|
if (!pScan->noPseudoRefAfterGrp || NULL == pScan->pScanPseudoCols || pScan->pScanPseudoCols->length <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pScan->node.pConditions && partTagsIsScanPseudoColsInConds(pScan)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pNode = NULL, *pTarget = NULL;
|
||||||
|
FOREACH(pNode, pScan->pScanPseudoCols) {
|
||||||
|
FOREACH(pTarget, pScan->node.pTargets) {
|
||||||
|
if (0 == strcmp(((SExprNode*)pNode)->aliasName, ((SColumnNode*)pTarget)->colName)) {
|
||||||
|
ERASE_NODE(pScan->node.pTargets);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesDestroyList(pScan->pScanPseudoCols);
|
||||||
|
pScan->pScanPseudoCols = NULL;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized, NULL);
|
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized, NULL);
|
||||||
if (NULL == pNode) {
|
if (NULL == pNode) {
|
||||||
|
@ -3362,6 +3432,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg);
|
code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = partTagsOptRemovePseudoCols(pScan);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = partTagsOptRebuildTbanme(pScan->pGroupTags);
|
code = partTagsOptRebuildTbanme(pScan->pGroupTags);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,11 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
|
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
|
||||||
int32_t code = 0;
|
int32_t code = schUpdateJobStatus(pJob, status);
|
||||||
SCH_ERR_JRET(schUpdateJobStatus(pJob, status));
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
SCH_ERR_JRET((param && *(int32_t*)param) ? *(int32_t*)param : code);
|
||||||
|
}
|
||||||
|
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case JOB_TASK_STATUS_INIT:
|
case JOB_TASK_STATUS_INIT:
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue