fix data block not properly saved

This commit is contained in:
Ganlin Zhao 2023-05-05 15:24:15 +08:00
parent 7b01cad063
commit 5a1681d91d
1 changed files with 12 additions and 6 deletions

View File

@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo {
int64_t prevTs;
bool prevTsSet;
uint64_t groupId;
SSDataBlock* pCurrentGroupRes;
SSDataBlock* pPrevGroupRes;
SSDataBlock* pNextGroupRes;
} STimeSliceOperatorInfo;
@ -640,6 +640,11 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
}
}
static void copyPrevGroupDataBlock(SSDataBlock* pDstBlock, SSDataBlock* pSrcBlock) {
blockDataCleanup(pDstBlock);
assignOneDataBlock(pDstBlock, pSrcBlock);
}
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
pSliceInfo->current = pSliceInfo->win.skey;
pSliceInfo->prevTsSet = false;
@ -667,7 +672,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pSliceInfo->pNextGroupRes != NULL) {
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
pSliceInfo->pCurrentGroupRes = pSliceInfo->pNextGroupRes;
copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pSliceInfo->pNextGroupRes);
pSliceInfo->pNextGroupRes = NULL;
}
@ -701,13 +706,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
pSliceInfo->pCurrentGroupRes = pBlock;
copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pBlock);
}
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pCurrentGroupRes,
pSliceInfo->pCurrentGroupRes->info.rows - 1);
genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pPrevGroupRes,
pSliceInfo->pPrevGroupRes->info.rows - 1);
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (pOperator->status == OP_EXEC_DONE) {
@ -771,8 +776,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->prevTsSet = false;
pInfo->prevTs = 0;
pInfo->groupId = 0;
pInfo->pPrevGroupRes = createDataBlock();
pInfo->pNextGroupRes = NULL;
pInfo->pCurrentGroupRes = NULL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
@ -801,6 +806,7 @@ void destroyTimeSliceOperatorInfo(void* param) {
STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pPrevGroupRes = blockDataDestroy(pInfo->pPrevGroupRes);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);