fix the issue #439

This commit is contained in:
slguan 2019-09-04 13:09:01 +08:00
parent 920edd131d
commit edcad56d4b
2 changed files with 54 additions and 36 deletions

View File

@ -2694,35 +2694,17 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, void *pBlock) { static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, void *pBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t newPos = pQuery->pos;
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (newPos + pQuery->limit.offset > pBlockInfo->size) {
newPos = pBlockInfo->size - 1;
} else {
newPos += pQuery->limit.offset;
}
} else {
if (newPos < pQuery->limit.offset) {
newPos = 0;
} else {
newPos -= pQuery->limit.offset;
}
}
TSKEY newKey = 0;
if (IS_DISK_DATA_BLOCK(pQuery)) {
newKey = getTimestampInDiskBlock(pRuntimeEnv, newPos);
} else {
newKey = getTimestampInCacheBlock(pBlock, newPos);
}
/* /*
* The actually qualified points that can be skipped needs to be calculated if query is * The actually qualified points that can be skipped needs to be calculated if query is
* done in current data block * done in current data block
*/ */
if ((newKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
(newKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { (pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
// force load timestamp data blocks
if (IS_DISK_DATA_BLOCK(pQuery)) {
getTimestampInDiskBlock(pRuntimeEnv, 0);
}
// update the pQuery->limit.offset value, and pQuery->pos value // update the pQuery->limit.offset value, and pQuery->pos value
TSKEY* keys = NULL; TSKEY* keys = NULL;
@ -2741,6 +2723,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInf
break; break;
} }
} }
} else { } else {
for(i = pQuery->pos; i >= 0 && pQuery->limit.offset > 0; --i) { for(i = pQuery->pos; i >= 0 && pQuery->limit.offset > 0; --i) {
if (keys[i] >= pQuery->ekey) { if (keys[i] >= pQuery->ekey) {
@ -2751,14 +2734,31 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInf
} }
} }
pQuery->pos = i; if (((i == pBlockInfo->size || keys[i] > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((i < 0 || keys[i] < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
pQuery->pos = -1;
} else {
pQuery->pos = i;
}
} else { } else {
pQuery->skey = newKey; if (QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->lastKey = pQuery->skey; pQuery->pos += pQuery->limit.offset;
} else {
pQuery->pos -= pQuery->limit.offset;
}
assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);
if (IS_DISK_DATA_BLOCK(pQuery)) {
pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else {
pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos);
}
// update the offset value // update the offset value
pQuery->limit.offset -= abs(newPos - pQuery->pos); pQuery->lastKey = pQuery->skey;
pQuery->pos = newPos; pQuery->limit.offset = 0;
} }
} }
@ -4565,6 +4565,9 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->pointsOffset = pQuery->pointsToRead; // clear all data in result buffer pQuery->pointsOffset = pQuery->pointsToRead; // clear all data in result buffer
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
// clear the buffer is full flag if exists
pQuery->over &= (~QUERY_RESBUF_FULL);
} else { } else {
int32_t numOfSkip = (int32_t)pQuery->limit.offset; int32_t numOfSkip = (int32_t)pQuery->limit.offset;
int32_t size = pQuery->pointsRead; int32_t size = pQuery->pointsRead;

View File

@ -550,7 +550,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
assert(pSids->numOfSubSet == 1 && start == 0 && end == pSids->numOfSids - 1 && pSupporter->meterIdx >= start && assert(pSids->numOfSubSet == 1 && start == 0 && end == pSids->numOfSids - 1 && pSupporter->meterIdx >= start &&
pSupporter->meterIdx <= end); pSupporter->meterIdx <= end);
for (int32_t k = pSupporter->meterIdx; k <= end; ++k, ++pSupporter->meterIdx) { while(pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx;
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
@ -561,6 +563,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, start)) { if (!multimeterMultioutputHelper(pQInfo, &dataInDisk, &dataInCache, k, start)) {
pQuery->skey = pSupporter->rawSKey; pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey; pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
continue; continue;
} }
@ -573,6 +577,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) { if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) {
pQuery->skey = pSupporter->rawSKey; pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey; pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
continue; continue;
} }
@ -582,6 +588,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
pQuery->skey = pSupporter->rawSKey; pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey; pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
continue; continue;
} }
} }
@ -595,7 +603,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pQuery->pointsRead = getNumOfResult(pRuntimeEnv); pQuery->pointsRead = getNumOfResult(pRuntimeEnv);
doSkipResults(pRuntimeEnv); doSkipResults(pRuntimeEnv);
// set query completed // the limitation of output result is reached, set the query completed
if (doRevisedResultsByLimit(pQInfo)) { if (doRevisedResultsByLimit(pQInfo)) {
pSupporter->meterIdx = pSupporter->pSidSet->numOfSids; pSupporter->meterIdx = pSupporter->pSidSet->numOfSids;
break; break;
@ -610,17 +618,24 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
*/ */
pQuery->skey = pSupporter->rawSKey; pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey; pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++;
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
pSupporter->meterIdx++;
break; break;
} }
} else { } else {
assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
// forward query range // forward query range
pQuery->skey = pQuery->lastKey; pQuery->skey = pQuery->lastKey;
break;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->pointsRead == 0) {
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
continue;
} else {
//buffer is full, wait for the next round to retrieve data from current meter
assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
break;
}
} }
} }
} }