Merge pull request #15269 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2022-07-22 12:50:03 +08:00 committed by GitHub
commit ab2ad2cc54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 28 additions and 22 deletions

View File

@ -1107,6 +1107,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
void blockDataCleanup(SSDataBlock* pDataBlock) { void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0; pDataBlock->info.rows = 0;
pDataBlock->info.groupId = 0;
pDataBlock->info.window.ekey = 0;
pDataBlock->info.window.skey = 0;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);

View File

@ -788,11 +788,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
} }
colIndex += 1; colIndex += 1;
ASSERT(rowIndex == remain);
} else { // the specified column does not exist in file block, fill with null data } else { // the specified column does not exist in file block, fill with null data
colDataAppendNNULL(pColData, 0, remain); colDataAppendNNULL(pColData, 0, remain);
} }
ASSERT(rowIndex == remain);
i += 1; i += 1;
} }

View File

@ -869,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo); STimeWindowAggSupp* pTwAggSup, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);

View File

@ -1334,12 +1334,10 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
if (pFilterNode == NULL) { if (pFilterNode == NULL || pBlock->info.rows == 0) {
return;
}
if (pBlock->info.rows == 0) {
return; return;
} }
SFilterInfo* filter = NULL; SFilterInfo* filter = NULL;
// todo move to the initialization function // todo move to the initialization function
@ -1356,8 +1354,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
filterFreeInfo(filter); filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
blockDataUpdateTsWindow(pBlock, 0);
taosMemoryFree(rowRes); taosMemoryFree(rowRes);
} }
@ -3412,6 +3408,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) { if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
pResBlock->info.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
} }
@ -3454,17 +3451,20 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
// 1. The result in current group not reach the threshold of output result, continue // 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) { if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
pResBlock->info.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
} }
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
pResBlock->info.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
} }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL); assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) { if (pResBlock->info.rows > pResultInfo->threshold) {
pResBlock->info.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
} }
} else { } else {
@ -3484,23 +3484,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SSDataBlock* fillResult = NULL; SSDataBlock* fillResult = NULL;
while (true) { while (true) {
fillResult = doFillImpl(pOperator); fillResult = doFillImpl(pOperator);
if (fillResult != NULL) {
doFilter(pInfo->pCondition, fillResult);
}
if (fillResult == NULL) { if (fillResult == NULL) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
} }
doFilter(pInfo->pCondition, fillResult);
if (fillResult->info.rows > 0) { if (fillResult->info.rows > 0) {
break; break;
} }
} }
if (fillResult != NULL) { if (fillResult != NULL) {
size_t rows = fillResult->info.rows; pOperator->resultInfo.totalRows += fillResult->info.rows;
pOperator->resultInfo.totalRows += rows;
} }
return fillResult; return fillResult;
@ -4444,6 +4440,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
if (pHandle->vnode) { if (pHandle->vnode) {
int32_t code = int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
@ -4463,7 +4465,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
#endif #endif
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, &aggSup, pTaskInfo);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {

View File

@ -1525,7 +1525,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp* pTwSup, SExecTaskInfo* pTaskInfo) {
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -1539,11 +1539,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pTagCond = pTagCond; pInfo->pTagCond = pTagCond;
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = *pTwSup;
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);

View File

@ -2088,7 +2088,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentileMerge, .translateFunc = translateApercentileMerge,
.getEnvFunc = getApercentileFuncEnv, .getEnvFunc = getApercentileFuncEnv,
.initFunc = functionSetup, .initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunctionMerge, .processFunc = apercentileFunctionMerge,
.finalizeFunc = apercentileFinalize, .finalizeFunc = apercentileFinalize,
.invertFunc = NULL, .invertFunc = NULL,

View File

@ -2592,6 +2592,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->algo == APERCT_ALGO_TDIGEST) { if (pInfo->algo == APERCT_ALGO_TDIGEST) {
buildTDigestInfo(pInfo);
if (pInfo->pTDigest->size > 0) { if (pInfo->pTDigest->size > 0) {
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100); pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
} else { // no need to free } else { // no need to free
@ -2599,6 +2600,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
buildHistogramInfo(pInfo);
if (pInfo->pHisto->numOfElems > 0) { if (pInfo->pHisto->numOfElems > 0) {
qDebug("get the final res:%d, elements:%"PRId64", entry:%d", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries); qDebug("get the final res:%d, elements:%"PRId64", entry:%d", pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries);

View File

@ -43,6 +43,7 @@ endi
if $data01 != 1 then if $data01 != 1 then
if $data01 != 10 then if $data01 != 10 then
print =============> $data01
print retention level 2 file result $data01 != 1 or 10 print retention level 2 file result $data01 != 1 or 10
return -1 return -1
endi endi