fix bug #660
This commit is contained in:
parent
d4bcce0d8f
commit
0a0852ad46
|
@ -60,8 +60,13 @@
|
||||||
|
|
||||||
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
|
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
|
||||||
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
|
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
|
||||||
|
|
||||||
#define __sync_add_and_fetch_64 __sync_add_and_fetch
|
#define __sync_add_and_fetch_64 __sync_add_and_fetch
|
||||||
#define __sync_add_and_fetch_32 __sync_add_and_fetch
|
#define __sync_add_and_fetch_32 __sync_add_and_fetch
|
||||||
|
|
||||||
|
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
|
||||||
|
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
|
||||||
|
|
||||||
int32_t __sync_val_load_32(int32_t *ptr);
|
int32_t __sync_val_load_32(int32_t *ptr);
|
||||||
void __sync_val_restore_32(int32_t *ptr, int32_t newval);
|
void __sync_val_restore_32(int32_t *ptr, int32_t newval);
|
||||||
|
|
||||||
|
|
|
@ -814,10 +814,10 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId || pBlock->numOfPoints <= 0) {
|
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
|
||||||
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d",
|
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, blockMeterObj:%p",
|
||||||
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
|
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
|
||||||
pQuery->blockId);
|
pQuery->blockId, pMeterObj, pBlock->pMeterObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,12 @@
|
||||||
|
|
||||||
#include "vnodeQueryImpl.h"
|
#include "vnodeQueryImpl.h"
|
||||||
|
|
||||||
|
#define ALL_CACHE_BLOCKS_CHECKED(q) \
|
||||||
|
((q)->slot == (q)->currentSlot && QUERY_IS_ASC_QUERY(q) || (q)->slot == (q)->firstSlot && (!QUERY_IS_ASC_QUERY(q)))
|
||||||
|
|
||||||
|
#define FORWARD_CACHE_BLOCK_CHECK_SLOT(slot, step, maxblocks) (slot) = ((slot) + (step) + (maxblocks)) % (maxblocks);
|
||||||
|
|
||||||
|
|
||||||
static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) {
|
static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) {
|
||||||
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
|
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
|
||||||
SQuery * pQuery = &pQInfo->query;
|
SQuery * pQuery = &pQInfo->query;
|
||||||
|
@ -49,6 +55,22 @@ static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataI
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The start position of the first check cache block is located before starting the loop.
|
||||||
|
* And the start position for next cache blocks needs to be decided before checking each cache block.
|
||||||
|
*/
|
||||||
|
static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, bool *firstCheckSlot) {
|
||||||
|
if (!(*firstCheckSlot)) {
|
||||||
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
pQuery->pos = 0;
|
||||||
|
} else {
|
||||||
|
pQuery->pos = pBlock->numOfPoints - 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(*firstCheckSlot) = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
|
static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
|
||||||
SQuery * pQuery = &pQInfo->query;
|
SQuery * pQuery = &pQInfo->query;
|
||||||
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
|
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
|
||||||
|
@ -147,24 +169,39 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool firstCheckSlot = true;
|
||||||
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
|
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) {
|
for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) {
|
||||||
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
|
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
|
||||||
|
|
||||||
// cache block may be flushed to disk, so it is not available, ignore it and try next
|
/*
|
||||||
if (pBlock == NULL) {
|
* 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next
|
||||||
pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
|
*
|
||||||
|
* 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache
|
||||||
|
* procedure. The block has been allocated but data has not been put into yet. If the block is the last
|
||||||
|
* block(newly allocated block), abort query. Otherwise, skip it and go on.
|
||||||
|
*/
|
||||||
|
if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) {
|
||||||
|
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot);
|
||||||
|
|
||||||
TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0];
|
TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0];
|
||||||
// in handling file data block, this query condition is checked during fetching candidate file blocks
|
|
||||||
|
// in handling file data block, the timestamp range validation is done during fetching candidate file blocks
|
||||||
if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* only record the key on last block */
|
// only record the key on last block
|
||||||
SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
|
SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
|
||||||
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK);
|
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK);
|
||||||
|
|
||||||
|
@ -176,24 +213,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
|
||||||
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL,
|
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL,
|
||||||
searchFn);
|
searchFn);
|
||||||
|
|
||||||
// todo refactor
|
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
|
||||||
if ((pQuery->slot == pQuery->currentSlot && QUERY_IS_ASC_QUERY(pQuery)) ||
|
|
||||||
(pQuery->slot == pQuery->firstSlot && !QUERY_IS_ASC_QUERY(pQuery))) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// try next cache block
|
FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks);
|
||||||
pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
pQuery->pos = 0;
|
|
||||||
} else { // backwards traverse encounter the cache invalid, abort scan cache.
|
|
||||||
SCacheBlock *pNextBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
|
|
||||||
if (pNextBlock == NULL) {
|
|
||||||
break; // todo fix
|
|
||||||
} else {
|
|
||||||
pQuery->pos = pNextBlock->numOfPoints - 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -614,7 +614,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode->refCount > 0) {
|
if (pNode->refCount > 0) {
|
||||||
__sync_add_and_fetch_32(&pNode->refCount, -1);
|
__sync_sub_and_fetch_32(&pNode->refCount, 1);
|
||||||
pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
|
pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue