commit
a7b8c71ae3
|
@ -172,6 +172,7 @@ int vnodeFreeCacheBlock(SCacheBlock *pCacheBlock) {
|
|||
SCachePool *pPool = (SCachePool *)vnodeList[pObj->vnode].pCachePool;
|
||||
if (pCacheBlock->notFree) {
|
||||
pPool->notFreeSlots--;
|
||||
pInfo->unCommittedBlocks--;
|
||||
dTrace("vid:%d sid:%d id:%s, cache block is not free, slot:%d, index:%d notFreeSlots:%d",
|
||||
pObj->vnode, pObj->sid, pObj->meterId, pCacheBlock->slot, pCacheBlock->index, pPool->notFreeSlots);
|
||||
}
|
||||
|
|
|
@ -479,7 +479,7 @@ int vnodeImportToFile(SImportInfo *pImport) {
|
|||
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
|
||||
}
|
||||
|
||||
// last slot, the uncommitted slots shall be shifted
|
||||
// last slot, the uncommitted slots shall be shifted, a cache block may have empty rows
|
||||
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
|
||||
int points = pCacheBlock->numOfPoints - pInfo->commitPoint;
|
||||
if (points > 0) {
|
||||
|
@ -568,7 +568,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
|
|||
}
|
||||
}
|
||||
|
||||
// copy the overwritten data into buffer
|
||||
// copy the overwritten data into buffer, merge cache blocks
|
||||
tpoints = rows;
|
||||
pos = pImport->pos;
|
||||
slot = pImport->slot;
|
||||
|
@ -603,6 +603,19 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
|
|||
pos = 0;
|
||||
tpoints -= points;
|
||||
|
||||
if (tpoints == 0) {
|
||||
// free the rest of cache blocks, since cache blocks are merged
|
||||
int currentSlot = slot;
|
||||
while (slot != pInfo->currentSlot) {
|
||||
slot = (slot + 1) % pInfo->maxBlocks;
|
||||
pCacheBlock = pInfo->cacheBlocks[slot];
|
||||
vnodeFreeCacheBlock(pCacheBlock);
|
||||
}
|
||||
|
||||
pInfo->currentSlot = currentSlot;
|
||||
slot = currentSlot; // make sure to exit from the while loop
|
||||
}
|
||||
|
||||
if (slot == pInfo->currentSlot) break;
|
||||
slot = (slot + 1) % pInfo->maxBlocks;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue