diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 575f7ee8f4..251d4079e3 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -808,18 +808,19 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } } -void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, - SFillInfo *pFillInfo) { +void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { // discard following dataset in the same group and reset the interpolation information STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - int16_t prec = tinfo.precision; - int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; - int64_t revisedSTime = - taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); - taosResetFillInfo(pFillInfo, revisedSTime); + if (pFillInfo != NULL) { + int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; + int64_t revisedSTime = + taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision); + + taosResetFillInfo(pFillInfo, revisedSTime); + } pLocalReducer->discard = true; pLocalReducer->discardData->num = 0; @@ -915,13 +916,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { /* impose the limitation of output rows on the final result */ int32_t prevSize = pFinalDataPage->num; - int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; - - assert(overFlow < pRes->numOfRows); + int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; + assert(overflow < pRes->numOfRows); pRes->numOfClauseTotal = pQueryInfo->limit.limit; - pRes->numOfRows -= overFlow; - pFinalDataPage->num -= overFlow; + pRes->numOfRows -= overflow; + pFinalDataPage->num -= overflow; tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); @@ -988,13 +988,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo if (pRes->numOfRows > 0) { if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) { - int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; - pRes->numOfRows -= overFlow; + int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit; + pRes->numOfRows -= overflow; assert(pRes->numOfRows >= 0); pRes->numOfClauseTotal = pQueryInfo->limit.limit; - pFinalDataPage->num -= overFlow; + pFinalDataPage->num -= overflow; /* set remain data to be discarded, and reset the interpolation information */ savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index bd61117a85..e148bcb58e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3030,11 +3030,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo * // order has change already! int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (!QUERY_IS_ASC_QUERY(pQuery)) { - assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step); - } else { - assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step); - } + + // TODO validate the assertion +// if (!QUERY_IS_ASC_QUERY(pQuery)) { +// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step); +// } else { +// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step); +// } pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step; @@ -3113,7 +3115,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order); + SWITCH_ORDER(pRuntimeEnv->pCtx[i].order); } } @@ -4384,10 +4386,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - TSKEY nextKey = blockInfo.window.skey; if (!isIntervalQuery(pQuery)) { - setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey); + int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; + setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step); } else { // interval query + TSKEY nextKey = blockInfo.window.skey; setIntervalQueryRange(pQInfo, nextKey); int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 115a32567c..d3d890f361 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -545,9 +545,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; - if (pCheckInfo->iter != NULL) { + if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - SDataRow row = SL_GET_NODE_DATA(node); + + SDataRow row = SL_GET_NODE_DATA(node); k1 = dataRowKey(row); if (k1 == binfo.window.skey) { @@ -561,9 +562,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock } } - if (pCheckInfo->iiter != NULL) { + if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - SDataRow row = SL_GET_NODE_DATA(node); + + SDataRow row = SL_GET_NODE_DATA(node); k2 = dataRowKey(row); if (k2 == binfo.window.skey) { @@ -583,6 +585,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); } else { pQueryHandle->realNumOfRows = binfo.rows; + + cur->rows = binfo.rows; + cur->win = binfo.window; + cur->mixBlock = false; + cur->blockCompleted = true; + cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1); } } } else { //desc order @@ -914,7 +922,10 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); - + if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it + tSkipListIterNext(pCheckInfo->iter); + } + int32_t start = -1; if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { int32_t remain = end - pos + 1;