save index instead of ts
This commit is contained in:
parent
fe2629d061
commit
e74c0ac987
|
@ -45,7 +45,7 @@ typedef struct STimeSliceOperatorInfo {
|
||||||
SGroupKeys* pPrevGroupKey;
|
SGroupKeys* pPrevGroupKey;
|
||||||
SSDataBlock* pNextGroupRes;
|
SSDataBlock* pNextGroupRes;
|
||||||
SSDataBlock* pRemainRes; // save block unfinished processing
|
SSDataBlock* pRemainRes; // save block unfinished processing
|
||||||
int64_t remainTs; // the remaining timestamp in the block to be processed
|
int32_t remainIndex; // the remaining index in the block to be processed
|
||||||
} STimeSliceOperatorInfo;
|
} STimeSliceOperatorInfo;
|
||||||
|
|
||||||
static void destroyTimeSliceOperatorInfo(void* param);
|
static void destroyTimeSliceOperatorInfo(void* param);
|
||||||
|
@ -669,8 +669,13 @@ static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBl
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||||
if (curIndex < pBlock->info.rows - 1) {
|
if (curIndex < pBlock->info.rows - 1) {
|
||||||
pSliceInfo->pRemainRes = pBlock;
|
pSliceInfo->pRemainRes = pBlock;
|
||||||
pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
|
pSliceInfo->remainIndex = curIndex + 1;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all data in remaining block processed
|
||||||
|
pSliceInfo->pRemainRes = NULL;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
|
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
|
||||||
|
@ -679,14 +684,10 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
|
||||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
|
||||||
|
|
||||||
// check if need to resume from the position of last unfinished block
|
int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
|
||||||
if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs &&
|
for (; i < pBlock->info.rows; ++i) {
|
||||||
pSliceInfo->current <= pSliceInfo->remainTs) {
|
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for duplicate timestamps
|
// check for duplicate timestamps
|
||||||
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
|
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
|
||||||
|
@ -696,13 +697,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
|
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (checkWindowBoundReached(pSliceInfo)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
|
||||||
saveBlockStatus(pSliceInfo, pBlock, i);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ts == pSliceInfo->current) {
|
if (ts == pSliceInfo->current) {
|
||||||
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
||||||
|
@ -984,7 +978,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->pPrevGroupKey = NULL;
|
pInfo->pPrevGroupKey = NULL;
|
||||||
pInfo->pNextGroupRes = NULL;
|
pInfo->pNextGroupRes = NULL;
|
||||||
pInfo->pRemainRes = NULL;
|
pInfo->pRemainRes = NULL;
|
||||||
pInfo->remainTs = 0;
|
pInfo->remainIndex = 0;
|
||||||
|
pOperator->resultInfo.threshold = 1;
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||||
|
|
Loading…
Reference in New Issue