fix small bug

This commit is contained in:
Hongze Cheng 2019-11-25 17:42:39 +08:00
parent 7d1da2127f
commit a3464134c8
1 changed files with 30 additions and 0 deletions

View File

@ -674,6 +674,9 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
TSCKSUM checksum = 0;
int pointsImported = 0;
int code = TSDB_CODE_SUCCESS;
SCachePool * pPool = (SCachePool *)pVnode->pCachePool;
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
TSKEY lastKeyImported = 0;
TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
TSKEY minFileKey = fid * delta;
@ -758,6 +761,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
}
importHandle.oldNumOfBlocks = 0;
importHandle.driftOffset += sizeof(SCompInfo);
lastKeyImported = lastKey;
for (int rowsWritten = 0; rowsWritten < rows;) {
int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */);
@ -874,6 +878,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
// TODO : Convert into while here
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, rowsLeft, data, 0);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
assert(importHandle.last == 0);
@ -1032,6 +1037,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
}
@ -1099,6 +1105,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
} else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
@ -1112,6 +1119,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
blockIter.pos)) {
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
@ -1157,6 +1165,28 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
pImport->importedRows += pointsImported;
pthread_mutex_lock(&(pPool->vmutex));
if (pInfo->numOfBlocks > 0) {
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
// data may be in commited cache, cache shall be released
if (lastKeyImported > firstKeyInCache) {
while (slot != pInfo->commitSlot) {
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
vnodeFreeCacheBlock(pCacheBlock);
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
if (pInfo->commitPoint == pObj->pointsPerBlock) {
if (pInfo->cacheBlocks[pInfo->commitSlot]->pMeterObj == pObj) {
vnodeFreeCacheBlock(pInfo->cacheBlocks[pInfo->commitSlot]);
}
}
}
}
pthread_mutex_unlock(&(pPool->vmutex));
// TODO: free the allocated memory
tfree(buffer);
tfree(cbuffer);