fix lastBlock remain ts not saved
This commit is contained in:
parent
b4c2085ddc
commit
7ee05df7c6
|
@ -45,6 +45,7 @@ typedef struct STimeSliceOperatorInfo {
|
||||||
SGroupKeys* pPrevGroupKey;
|
SGroupKeys* pPrevGroupKey;
|
||||||
SSDataBlock* pNextGroupRes;
|
SSDataBlock* pNextGroupRes;
|
||||||
SSDataBlock* pRemainRes; // save block unfinished processing
|
SSDataBlock* pRemainRes; // save block unfinished processing
|
||||||
|
int64_t remainTs; // the remaining timestamp in the block to be processed
|
||||||
} STimeSliceOperatorInfo;
|
} STimeSliceOperatorInfo;
|
||||||
|
|
||||||
static void destroyTimeSliceOperatorInfo(void* param);
|
static void destroyTimeSliceOperatorInfo(void* param);
|
||||||
|
@ -642,11 +643,9 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t threshold) {
|
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
|
|
||||||
if (pResBlock->info.rows > threshold) {
|
if (pResBlock->info.rows > threshold) {
|
||||||
pSliceInfo->pRemainRes = pBlock;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -661,6 +660,16 @@ static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
|
||||||
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
|
|
||||||
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||||
|
if (curIndex < pBlock->info.rows - 1) {
|
||||||
|
pSliceInfo->pRemainRes = pBlock;
|
||||||
|
pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
|
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
|
||||||
SExecTaskInfo* pTaskInfo, bool ignoreNull) {
|
SExecTaskInfo* pTaskInfo, bool ignoreNull) {
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
|
@ -670,6 +679,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
||||||
|
|
||||||
|
// check if need to resume from the position of last unfinished block
|
||||||
|
if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs &&
|
||||||
|
pSliceInfo->current <= pSliceInfo->remainTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// check for duplicate timestamps
|
// check for duplicate timestamps
|
||||||
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
|
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
|
||||||
|
@ -681,7 +696,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
if (checkWindowBoundReached(pSliceInfo)) {
|
if (checkWindowBoundReached(pSliceInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
|
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -697,7 +713,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
if (checkWindowBoundReached(pSliceInfo)) {
|
if (checkWindowBoundReached(pSliceInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
|
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (ts < pSliceInfo->current) {
|
} else if (ts < pSliceInfo->current) {
|
||||||
|
@ -723,7 +740,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
if (checkWindowBoundReached(pSliceInfo)) {
|
if (checkWindowBoundReached(pSliceInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
|
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -759,7 +777,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
if (checkWindowBoundReached(pSliceInfo)) {
|
if (checkWindowBoundReached(pSliceInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
|
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -854,7 +873,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pSliceInfo->pNextGroupRes != NULL) {
|
if (pSliceInfo->pNextGroupRes != NULL) {
|
||||||
doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
|
doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
|
||||||
if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
goto _finished;
|
goto _finished;
|
||||||
}
|
}
|
||||||
|
@ -879,7 +898,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
doHandleTimeslice(pOperator, pBlock);
|
doHandleTimeslice(pOperator, pBlock);
|
||||||
if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) {
|
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
goto _finished;
|
goto _finished;
|
||||||
}
|
}
|
||||||
|
@ -943,6 +962,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
|
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
|
||||||
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
pOperator->resultInfo.threshold = 1;
|
||||||
|
|
||||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||||
pInfo->pLinearInfo = NULL;
|
pInfo->pLinearInfo = NULL;
|
||||||
|
@ -956,6 +976,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->pPrevGroupKey = NULL;
|
pInfo->pPrevGroupKey = NULL;
|
||||||
pInfo->pNextGroupRes = NULL;
|
pInfo->pNextGroupRes = NULL;
|
||||||
pInfo->pRemainRes = NULL;
|
pInfo->pRemainRes = NULL;
|
||||||
|
pInfo->remainTs = 0;
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||||
|
|
Loading…
Reference in New Issue