diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 722266db2c..f12db0971e 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -399,7 +399,6 @@ static void doCloseQueryFiles(SQueryFilesInfo *pVnodeFileInfo) { assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); pVnodeFileInfo->headerFileSize = -1; - doCloseQueryFileInfoFD(pVnodeFileInfo); } @@ -458,11 +457,12 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; int64_t st = taosGetTimestampUs(); - - if (vnodeIsCompBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIndex)) { + + // if the corresponding data/header files are already closed, re-open them here + if (vnodeIsCompBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIndex) && + pRuntimeEnv->vnodeFileInfo.current == fileIndex) { dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d compBlock info is loaded, not reload", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pHeadeFileInfo->fileID); - return pQuery->numOfBlocks; } @@ -1057,7 +1057,7 @@ void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, in position->pos = pos; } -bool isCacheBlockValid(SQuery *pQuery, SCacheBlock *pBlock, SMeterObj *pMeterObj) { +bool isCacheBlockValid(SQuery *pQuery, SCacheBlock *pBlock, SMeterObj *pMeterObj, int32_t slot) { if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { SMeterObj *pNewMeterObj = pBlock->pMeterObj; char * id = (pNewMeterObj != NULL) ? pNewMeterObj->meterId : NULL; @@ -1081,11 +1081,19 @@ bool isCacheBlockValid(SQuery *pQuery, SCacheBlock *pBlock, SMeterObj *pMeterObj dWarn( "QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d," "allocated but not write data yet.", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, slot, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); return false; } + + SCacheInfo* pCacheInfo = (SCacheInfo*) pMeterObj->pCache; + if (pCacheInfo->commitPoint == pMeterObj->pointsPerBlock && pQuery->slot == pCacheInfo->currentSlot) { + dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is committed, ignore. slot:%d first:%d, last:%d, numOfBlocks:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, slot, pQuery->firstSlot, + pQuery->currentSlot, pQuery->numOfBlocks); + return false; + } return true; } @@ -1117,7 +1125,7 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE } // block is empty or block does not belongs to current table, return NULL value - if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) { + if (!isCacheBlockValid(pQuery, pBlock, pMeterObj, slot)) { return NULL; } @@ -1203,7 +1211,7 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE pQuery->fileId = -1; pQuery->slot = slot; - if (!isCacheBlockValid(pQuery, pNewBlock, pMeterObj)) { + if (!isCacheBlockValid(pQuery, pNewBlock, pMeterObj, slot)) { return NULL; } @@ -1621,6 +1629,11 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) { return; } + + // no qualified results exist, abort check + if (pWindowResInfo->size == 0) { + return; + } // query completed if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || @@ -4134,13 +4147,14 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * tw = win; int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &tw, pWindowResInfo, &blockInfo, primaryKey, searchFn); - assert(startPos > 0); + assert(startPos >= 0); pQuery->limit.offset -= 1; // set the abort info pQuery->pos = startPos; pQuery->lastKey = primaryKey[startPos]; + pWindowResInfo->prevSKey = tw.skey; win = tw; continue; } else { @@ -4176,6 +4190,8 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * // set the abort info pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0:blockInfo.size-1; pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.keyFirst:blockInfo.keyLast; + pWindowResInfo->prevSKey = n.skey; + win = n; if (pQuery->limit.offset == 0 && IS_DISK_DATA_BLOCK(pQuery)) {