This commit is contained in:
hzcheng 2020-04-16 17:18:23 +08:00
parent d505956440
commit 7c5633cf1a
2 changed files with 11 additions and 6 deletions

View File

@ -924,10 +924,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Loop to write the data in the cache to files. If no data to write, just break the loop // Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
int nLoop = 0;
while (true) { while (true) {
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
ASSERT(rowsRead >= 0); ASSERT(rowsRead >= 0);
if (pDataCols->numOfPoints == 0) break; if (pDataCols->numOfPoints == 0) break;
nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);

View File

@ -281,9 +281,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
// Load the SCompInfo part if neccessary // Load the SCompInfo part if neccessary
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD) && (pIdx->offset > 0)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
}
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
ASSERT(pHelper->hasOldLastBlock == false); ASSERT(pHelper->hasOldLastBlock == false);
@ -370,6 +368,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) { if (pIdx->offset > 0) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE);
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
@ -377,6 +376,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
} else { } else {
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE);
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
@ -919,15 +919,18 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
} }
// Memmove if needed // Memmove if needed
int tsize = pIdx->len - (sizeof(SCompData) + sizeof(SCompCol) * blkIdx); int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx);
if (tsize > 0) { if (tsize > 0) {
memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompData) + sizeof(SCompBlock) * (blkIdx + 1)), ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
(void *)((char *)pHelper->pCompInfo + sizeof(SCompData) + sizeof(SCompBlock) * blkIdx), tsize); ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
(void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
} }
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfSuperBlocks++; pIdx->numOfSuperBlocks++;
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SCompBlock);
ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;