fix(query): fix interp + fill interpolation after last datablock

This commit is contained in:
Ganlin Zhao 2022-08-10 14:21:31 +08:00
parent 1d90aff252
commit 249e2b12f6
2 changed files with 23 additions and 17 deletions

View File

@ -2631,6 +2631,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
pPrevReader->idStr); pPrevReader->idStr);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);

View File

@ -2115,7 +2115,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
} }
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock, static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup,
SSDataBlock* pResBlock) { SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
@ -2124,10 +2124,10 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
// output the result // output the result
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); //SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) { switch (pSliceInfo->fillType) {
@ -2383,7 +2383,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
@ -2422,7 +2422,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
@ -2443,7 +2443,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
@ -2467,7 +2467,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
doKeepNextRows(pSliceInfo, pBlock, i); doKeepNextRows(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
@ -2501,7 +2501,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) { if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
@ -2532,15 +2532,16 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
// check if need to interpolate after ts range }
// except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { // check if need to interpolate after last datablock
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pResBlock); // except for fill(next), fill(linear)
pSliceInfo->current = while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) {
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
if (pResBlock->info.rows >= pResBlock->info.capacity) { pSliceInfo->current =
break; taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
} if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
} }
} }