From a3464134c85b946f4816428a648385be788dfd6f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 25 Nov 2019 17:42:39 +0800 Subject: [PATCH] fix small bug --- src/system/detail/src/vnodeImport.c | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index fa14a53c58..270f02b995 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -674,6 +674,9 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int TSCKSUM checksum = 0; int pointsImported = 0; int code = TSDB_CODE_SUCCESS; + SCachePool * pPool = (SCachePool *)pVnode->pCachePool; + SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache); + TSKEY lastKeyImported = 0; TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision]; TSKEY minFileKey = fid * delta; @@ -758,6 +761,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } importHandle.oldNumOfBlocks = 0; importHandle.driftOffset += sizeof(SCompInfo); + lastKeyImported = lastKey; for (int rowsWritten = 0; rowsWritten < rows;) { int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */); @@ -874,6 +878,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int // TODO : Convert into while here vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, rowsLeft, data, 0); pointsImported++; + lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); assert(importHandle.last == 0); @@ -1032,6 +1037,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); pointsImported++; + lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); payloadIter++; } @@ -1099,6 +1105,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); pointsImported++; + lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); payloadIter++; rowOffset++; } else { @@ -1112,6 +1119,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int blockIter.pos)) { vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); pointsImported++; + lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); payloadIter++; rowOffset++; } else { @@ -1157,6 +1165,28 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int pImport->importedRows += pointsImported; + pthread_mutex_lock(&(pPool->vmutex)); + if (pInfo->numOfBlocks > 0) { + int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; + TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0])); + + // data may be in commited cache, cache shall be released + if (lastKeyImported > firstKeyInCache) { + while (slot != pInfo->commitSlot) { + SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; + vnodeFreeCacheBlock(pCacheBlock); + slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; + } + + if (pInfo->commitPoint == pObj->pointsPerBlock) { + if (pInfo->cacheBlocks[pInfo->commitSlot]->pMeterObj == pObj) { + vnodeFreeCacheBlock(pInfo->cacheBlocks[pInfo->commitSlot]); + } + } + } + } + pthread_mutex_unlock(&(pPool->vmutex)); + // TODO: free the allocated memory tfree(buffer); tfree(cbuffer);