refactor(query): opt perf.
This commit is contained in:
parent
b567a1ebc3
commit
ed0b92bb33
|
@ -917,7 +917,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||
|
||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo);
|
||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||
SSDataBlock* pBlock, const char* idStr);
|
||||
|
||||
|
|
|
@ -1117,18 +1117,22 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
|||
|
||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, int32_t status);
|
||||
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) {
|
||||
if (pFilterNode == NULL || pBlock->info.rows == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SFilterInfo* filter = NULL;
|
||||
SFilterInfo* filter = pFilterInfo;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
pError("start filter")
|
||||
pError("start filter");
|
||||
|
||||
// todo move to the initialization function
|
||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
int32_t code = 0;
|
||||
if (filter == NULL) {
|
||||
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
}
|
||||
|
||||
int64_t st1 = taosGetTimestampUs();
|
||||
pError("init completed, el: %d us", st1-st);
|
||||
|
@ -1145,7 +1149,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
|||
// todo the keep seems never to be True??
|
||||
int32_t status = 0;
|
||||
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols, &status);
|
||||
|
||||
if (pFilterInfo == NULL) {
|
||||
filterFreeInfo(filter);
|
||||
}
|
||||
|
||||
int64_t st3 = taosGetTimestampUs();
|
||||
pError("do filter, el: %d us", st3-st2);
|
||||
|
@ -2456,7 +2463,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
||||
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
|
||||
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
|
||||
|
||||
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
|
@ -2850,7 +2857,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo);
|
||||
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL);
|
||||
if (fillResult->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pRes, NULL, NULL);
|
||||
|
||||
if (!hasRemainResults(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
|
|
|
@ -386,7 +386,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
if (pJoinInfo->pCondAfterMerge != NULL) {
|
||||
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
|
||||
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL);
|
||||
}
|
||||
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
||||
break;
|
||||
|
|
|
@ -294,7 +294,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
// do apply filter
|
||||
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL);
|
||||
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL);
|
||||
|
||||
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
|
||||
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
|
||||
|
@ -303,7 +303,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
} else {
|
||||
// do apply filter
|
||||
if (pRes->info.rows > 0) {
|
||||
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
|
||||
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
|
||||
if (pRes->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -496,7 +496,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL);
|
||||
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL);
|
||||
size_t rows = pInfo->pRes->info.rows;
|
||||
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
|
||||
break;
|
||||
|
@ -598,7 +598,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pRes->info.rows = 1;
|
||||
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
|
||||
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
|
||||
|
||||
/*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
|
||||
|
||||
|
|
|
@ -380,7 +380,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
|
||||
if (pTableScanInfo->pFilterNode != NULL) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pTableScanInfo->readRecorder.filterTime += el;
|
||||
|
@ -747,6 +747,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
|
||||
if (pInfo->pFilterNode != NULL) {
|
||||
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
}
|
||||
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->currentGroupId = -1;
|
||||
|
@ -1112,7 +1117,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
|||
return NULL;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pResult, NULL);
|
||||
doFilter(pInfo->pCondition, pResult, NULL, NULL);
|
||||
if (pResult->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -1401,7 +1406,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
}
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
return 0;
|
||||
|
@ -2147,7 +2152,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
|||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
|
||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3139,7 +3144,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
|
||||
if (pTableScanInfo->pFilterNode != NULL) {
|
||||
int64_t st = taosGetTimestampMs();
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL);
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pTableScanInfo->readRecorder.filterTime += el;
|
||||
|
|
|
@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo);
|
||||
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL);
|
||||
if (blockDataGetNumOfRows(pBlock) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -1215,7 +1215,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
@ -1254,7 +1254,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
@ -1291,7 +1291,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBlock, NULL);
|
||||
doFilter(pInfo->pCondition, pBlock, NULL, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
@ -1865,7 +1865,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
@ -1908,7 +1908,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
@ -5041,7 +5041,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true);
|
||||
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
|
||||
|
||||
doFilter(pMiaInfo->pCondition, pRes, NULL);
|
||||
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL);
|
||||
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue