fix(query): fix error in fill.

This commit is contained in:
Haojun Liao 2023-06-01 22:45:03 +08:00
parent 1036259a7b
commit 403617976c
1 changed files with 19 additions and 21 deletions

View File

@ -61,24 +61,24 @@ typedef struct SFillOperatorInfo {
SExprSupp noFillExprSupp; SExprSupp noFillExprSupp;
} SFillOperatorInfo; } SFillOperatorInfo;
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock); static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
static void destroyFillOperatorInfo(void* param); static void destroyFillOperatorInfo(void* param);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { SResultInfo* pResultInfo, int32_t order) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
SSDataBlock* pResBlock = pInfo->pFinalRes; SSDataBlock* pResBlock = pInfo->pFinalRes;
int32_t order = TSDB_ORDER_ASC; // int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag, false); // getTableScanInfo(pOperator, &order, &scanFlag, false);
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
revisedFillStartKey(pInfo, pInfo->existNewGroupBlock); revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order);
int64_t ts = (order == TSDB_ORDER_ASC)? pInfo->existNewGroupBlock->info.window.ekey:pInfo->existNewGroupBlock->info.window.skey; int64_t ts = (order == TSDB_ORDER_ASC)? pInfo->existNewGroupBlock->info.window.ekey:pInfo->existNewGroupBlock->info.window.skey;
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
@ -93,7 +93,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
} }
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { SResultInfo* pResultInfo, int32_t order) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
@ -103,7 +103,7 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
// handle the cached new group data block // handle the cached new group data block
if (pInfo->existNewGroupBlock) { if (pInfo->existNewGroupBlock) {
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
} }
} }
@ -123,9 +123,7 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int
} }
// todo refactor: decide the start key according to the query time range. // todo refactor: decide the start key according to the query time range.
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) { static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) {
int32_t order = pInfo->pFillInfo->order;
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
int64_t skey = pBlock->info.window.skey; int64_t skey = pBlock->info.window.skey;
if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the
@ -137,7 +135,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) {
while(1) { while(1) {
int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision); int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (prev < pInfo->pFillInfo->start) { if (prev <= pInfo->pFillInfo->start) {
t = prev; t = prev;
break; break;
} }
@ -158,7 +156,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) {
while(1) { while(1) {
int64_t prev = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); int64_t prev = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (prev < pInfo->pFillInfo->start) { if (prev >= pInfo->pFillInfo->start) {
t = prev; t = prev;
break; break;
} }
@ -184,12 +182,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag, false); getTableScanInfo(pOperator, &order, &scanFlag, false);
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > 0) {
pResBlock->info.id.groupId = pInfo->curGroupId;
return pResBlock;
}
SOperatorInfo* pDownstream = pOperator->pDownstream[0]; SOperatorInfo* pDownstream = pOperator->pDownstream[0];
// the scan order may be different from the output result order for agg interval operator. // the scan order may be different from the output result order for agg interval operator.
@ -197,6 +189,12 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder; order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
} }
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
if (pResBlock->info.rows > 0) {
pResBlock->info.id.groupId = pInfo->curGroupId;
return pResBlock;
}
while (1) { while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -218,7 +216,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
if (pInfo->curGroupId == 0) { if (pInfo->curGroupId == 0) {
revisedFillStartKey(pInfo, pBlock); revisedFillStartKey(pInfo, pBlock, order);
} }
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
@ -249,7 +247,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock; return pResBlock;
} }
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
pResBlock->info.id.groupId = pInfo->curGroupId; pResBlock->info.id.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;
@ -257,7 +255,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
if (pResBlock->info.rows > pResultInfo->threshold) { if (pResBlock->info.rows > pResultInfo->threshold) {
pResBlock->info.id.groupId = pInfo->curGroupId; pResBlock->info.id.groupId = pInfo->curGroupId;
return pResBlock; return pResBlock;