Merge branch 'refact/query_opt' of github.com:taosdata/TDengine into refact/query_opt

This commit is contained in:
Xiaoyu Wang 2022-09-21 13:57:16 +08:00
commit bc28b929a5
10 changed files with 105 additions and 39 deletions

View File

@ -31,13 +31,17 @@ enum {
FLT_OPTION_NEED_UNIQE = 4,
};
#define FILTER_RESULT_ALL_QUALIFIED 0x1
#define FILTER_RESULT_NONE_QUALIFIED 0x2
#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3
typedef struct SFilterColumnParam {
int32_t numOfCols;
SArray *pDataBlock;
} SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pResultStatus);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);

View File

@ -208,6 +208,7 @@ typedef struct SExprSupp {
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
SFilterInfo* pFilterInfo;
} SExprSupp;
typedef struct SOperatorInfo {
@ -916,7 +917,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, const char* idStr);

View File

@ -1115,29 +1115,52 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
}
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) {
return;
}
SFilterInfo* filter = NULL;
SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs();
// pError("start filter");
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
int32_t code = 0;
if (filter == NULL) {
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
}
int64_t st1 = taosGetTimestampUs();
// pError("init completed, el: %d us", st1 - st);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int64_t st2 = taosGetTimestampUs();
// pError("set data from slotid, el: %d us", st2 - st1);
int8_t* rowRes = NULL;
// todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
int32_t status = 0;
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols, &status);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
if (pFilterInfo == NULL) {
filterFreeInfo(filter);
}
int64_t st3 = taosGetTimestampUs();
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep, status);
int64_t st4 = taosGetTimestampUs();
// pError("extract result filter, el: %d us, rows:%d, status:%d", st4 - st3, pBlock->info.rows, status);
if (pColMatchInfo != NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
@ -1155,13 +1178,34 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
taosMemoryFree(rowRes);
}
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, int32_t status) {
if (keep) {
return;
}
if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows;
int32_t totalRows = pBlock->info.rows;
if (status == FILTER_RESULT_ALL_QUALIFIED) {
SSDataBlock* px = createOneDataBlock(pBlock, true);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || pSrc->pData == NULL) {
continue;
}
colInfoDataCleanup(pDst, pBlock->info.rows);
colDataAssign(pDst, pSrc, totalRows, NULL);
}
blockDataDestroy(px); // fix memory leak
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else {
SSDataBlock* px = createOneDataBlock(pBlock, true);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -1197,9 +1241,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
blockDataDestroy(px); // fix memory leak
} else {
// do nothing
pBlock->info.rows = 0;
}
}
@ -2421,7 +2462,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
@ -2815,7 +2856,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break;
}
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo);
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL);
if (fillResult->info.rows > 0) {
break;
}

View File

@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL);
doFilter(pInfo->pCondition, pRes, NULL, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);

View File

@ -386,7 +386,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break;
}
if (pJoinInfo->pCondAfterMerge != NULL) {
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;

View File

@ -294,7 +294,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
// do apply filter
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL);
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
@ -303,7 +303,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} else {
// do apply filter
if (pRes->info.rows > 0) {
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
if (pRes->info.rows == 0) {
continue;
}
@ -496,7 +496,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL);
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL);
size_t rows = pInfo->pRes->info.rows;
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
break;
@ -598,7 +598,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
}
pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
/*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);

View File

@ -380,7 +380,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
@ -747,6 +747,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
}
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->currentGroupId = -1;
@ -1112,7 +1117,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return NULL;
}
doFilter(pInfo->pCondition, pResult, NULL);
doFilter(pInfo->pCondition, pResult, NULL, NULL);
if (pResult->info.rows == 0) {
continue;
}
@ -1401,7 +1406,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
return 0;
@ -2147,7 +2152,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
@ -3139,7 +3144,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;

View File

@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return NULL;
}
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo);
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}

View File

@ -1215,7 +1215,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1254,7 +1254,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1291,7 +1291,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock, NULL);
doFilter(pInfo->pCondition, pBlock, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1865,7 +1865,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1908,7 +1908,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -5041,7 +5041,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(pMiaInfo->pCondition, pRes, NULL);
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break;
}

View File

@ -4026,8 +4026,9 @@ _return:
FLT_RET(code);
}
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pResultStatus) {
if (NULL == info) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
return false;
}
@ -4051,13 +4052,27 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
taosMemoryFree(output.columnData);
taosArrayDestroy(pList);
int32_t numOfQualified = 0;
for(int32_t i = 0; i < output.numOfRows; ++i) {
if ((*p)[i] == 1) {
++numOfQualified;
}
}
if (numOfQualified == output.numOfRows) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else if (numOfQualified == 0) {
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
} else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
}
return false;
} else {
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);
}
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);
}
typedef struct SClassifyConditionCxt {
bool hasPrimaryKey;
bool hasTagIndexCol;