refactor: do some internal refactor.
This commit is contained in:
parent
ada2eb232e
commit
7522c2edf3
|
@ -788,11 +788,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||
}
|
||||
colIndex += 1;
|
||||
ASSERT(rowIndex == remain);
|
||||
} else { // the specified column does not exist in file block, fill with null data
|
||||
colDataAppendNNULL(pColData, 0, remain);
|
||||
}
|
||||
|
||||
ASSERT(rowIndex == remain);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -869,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
|
|||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
STimeWindowAggSupp* pTwAggSup, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
|
|
@ -3414,6 +3414,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
|
@ -3456,17 +3457,20 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
// 1. The result in current group not reach the threshold of output result, continue
|
||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else {
|
||||
|
@ -3486,23 +3490,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* fillResult = NULL;
|
||||
while (true) {
|
||||
fillResult = doFillImpl(pOperator);
|
||||
if (fillResult != NULL) {
|
||||
doFilter(pInfo->pCondition, fillResult);
|
||||
}
|
||||
|
||||
if (fillResult == NULL) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, fillResult);
|
||||
if (fillResult->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fillResult != NULL) {
|
||||
size_t rows = fillResult->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
pOperator->resultInfo.totalRows += fillResult->info.rows;
|
||||
}
|
||||
|
||||
return fillResult;
|
||||
|
@ -4444,6 +4444,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
|
||||
if (pHandle->vnode) {
|
||||
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
|
@ -4454,7 +4460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, &aggSup, pTaskInfo);
|
||||
return pOperator;
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
|
|
|
@ -1525,7 +1525,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
STimeWindowAggSupp* pTwSup, SExecTaskInfo* pTaskInfo) {
|
||||
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
|
@ -1539,11 +1539,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
pInfo->pTagCond = pTagCond;
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
pInfo->twAggSup = *pTwSup;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
|
|
@ -2088,7 +2088,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateApercentileMerge,
|
||||
.getEnvFunc = getApercentileFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.initFunc = apercentileFunctionSetup,
|
||||
.processFunc = apercentileFunctionMerge,
|
||||
.finalizeFunc = apercentileFinalize,
|
||||
.invertFunc = NULL,
|
||||
|
|
|
@ -43,6 +43,7 @@ endi
|
|||
|
||||
if $data01 != 1 then
|
||||
if $data01 != 10 then
|
||||
print =============> $data01
|
||||
print retention level 2 file result $data01 != 1 or 10
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue