From edcad56d4b50e4480f19ebc59ea79e2e594aeb6d Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 4 Sep 2019 13:09:01 +0800 Subject: [PATCH] fix the issue #439 --- src/system/src/vnodeQueryImpl.c | 63 ++++++++++++++++-------------- src/system/src/vnodeQueryProcess.c | 27 ++++++++++--- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/src/system/src/vnodeQueryImpl.c b/src/system/src/vnodeQueryImpl.c index 82d74bb059..f2d775bd19 100644 --- a/src/system/src/vnodeQueryImpl.c +++ b/src/system/src/vnodeQueryImpl.c @@ -2694,35 +2694,17 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, void *pBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t newPos = pQuery->pos; - if (QUERY_IS_ASC_QUERY(pQuery)) { - if (newPos + pQuery->limit.offset > pBlockInfo->size) { - newPos = pBlockInfo->size - 1; - } else { - newPos += pQuery->limit.offset; - } - } else { - if (newPos < pQuery->limit.offset) { - newPos = 0; - } else { - newPos -= pQuery->limit.offset; - } - } - - TSKEY newKey = 0; - if (IS_DISK_DATA_BLOCK(pQuery)) { - newKey = getTimestampInDiskBlock(pRuntimeEnv, newPos); - } else { - newKey = getTimestampInCacheBlock(pBlock, newPos); - } - /* * The actually qualified points that can be skipped needs to be calculated if query is * done in current data block */ - if ((newKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (newKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - setQueryStatus(pQuery, QUERY_COMPLETED); + if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { + + // force load timestamp data blocks + if (IS_DISK_DATA_BLOCK(pQuery)) { + getTimestampInDiskBlock(pRuntimeEnv, 0); + } // update the pQuery->limit.offset value, and pQuery->pos value TSKEY* keys = NULL; @@ -2741,6 +2723,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInf break; } } + } else { for(i = pQuery->pos; i >= 0 && pQuery->limit.offset > 0; --i) { if (keys[i] >= pQuery->ekey) { @@ -2751,14 +2734,31 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInf } } - pQuery->pos = i; + if (((i == pBlockInfo->size || keys[i] > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) || + ((i < 0 || keys[i] < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + pQuery->pos = -1; + } else { + pQuery->pos = i; + } } else { - pQuery->skey = newKey; - pQuery->lastKey = pQuery->skey; + if (QUERY_IS_ASC_QUERY(pQuery)) { + pQuery->pos += pQuery->limit.offset; + } else { + pQuery->pos -= pQuery->limit.offset; + } + + assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1); + + if (IS_DISK_DATA_BLOCK(pQuery)) { + pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } else { + pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos); + } // update the offset value - pQuery->limit.offset -= abs(newPos - pQuery->pos); - pQuery->pos = newPos; + pQuery->lastKey = pQuery->skey; + pQuery->limit.offset = 0; } } @@ -4565,6 +4565,9 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->pointsOffset = pQuery->pointsToRead; // clear all data in result buffer resetCtxOutputBuf(pRuntimeEnv); + + // clear the buffer is full flag if exists + pQuery->over &= (~QUERY_RESBUF_FULL); } else { int32_t numOfSkip = (int32_t)pQuery->limit.offset; int32_t size = pQuery->pointsRead; diff --git a/src/system/src/vnodeQueryProcess.c b/src/system/src/vnodeQueryProcess.c index fef9e5d666..b50606b933 100644 --- a/src/system/src/vnodeQueryProcess.c +++ b/src/system/src/vnodeQueryProcess.c @@ -550,7 +550,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { assert(pSids->numOfSubSet == 1 && start == 0 && end == pSids->numOfSids - 1 && pSupporter->meterIdx >= start && pSupporter->meterIdx <= end); - for (int32_t k = pSupporter->meterIdx; k <= end; ++k, ++pSupporter->meterIdx) { + while(pSupporter->meterIdx < pSupporter->numOfMeters) { + int32_t k = pSupporter->meterIdx; + if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; @@ -561,6 +563,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, start)) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; + + pSupporter->meterIdx++; continue; } @@ -573,6 +577,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; + + pSupporter->meterIdx++; continue; } @@ -582,6 +588,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; + + pSupporter->meterIdx++; continue; } } @@ -595,7 +603,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pQuery->pointsRead = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); - // set query completed + // the limitation of output result is reached, set the query completed if (doRevisedResultsByLimit(pQInfo)) { pSupporter->meterIdx = pSupporter->pSidSet->numOfSids; break; @@ -610,17 +618,24 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { */ pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; + pSupporter->meterIdx++; if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - pSupporter->meterIdx++; break; } } else { - assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); - // forward query range pQuery->skey = pQuery->lastKey; - break; + + // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter + if (pQuery->pointsRead == 0) { + assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); + continue; + } else { + //buffer is full, wait for the next round to retrieve data from current meter + assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); + break; + } } } }