fix a commit bug

This commit is contained in:
Hongze Cheng 2020-06-08 07:27:35 +00:00
parent 9d6d8a06be
commit ddb82b1dfc
2 changed files with 16 additions and 15 deletions

View File

@ -764,8 +764,8 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast, bool isSuperBlock) { bool isLast, bool isSuperBlock) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
rowsToWrite <= pHelper->config.maxRowsPerFileBlock); ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer); SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0; int64_t offset = 0;
@ -905,7 +905,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows); rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd) < 0) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
goto _err; goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
@ -936,21 +937,21 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// Key must overlap with the block // Key must overlap with the block
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
TSKEY keyLimit = TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
(blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
// rows1: number of rows must merge in this block // rows1: number of rows must merge in this block
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
// rows2: max nuber of rows the block can have more // rows2: max number of rows the block can have more
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows; int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
// rows3: number of rows between this block and the next block // rows3: number of rows between this block and the next block
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
ASSERT(rows3 >= rows1); ASSERT(rows3 >= rows1);
if ((rows2 >= rows1) && if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
(( blockAtIdx(pHelper, blkIdx)->last) || ((!blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd < 0)))) {
rowsWritten = rows1; rowsWritten = rows1;
bool isLast = false; bool isLast = false;
SFile *pFile = NULL; SFile *pFile = NULL;
@ -964,7 +965,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { // Load-Merge-Write } else { // Load-Merge-Write
// Load // Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;

View File

@ -45,7 +45,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
int tdCreateKVStore(char *fname) { int tdCreateKVStore(char *fname) {
char *tname = strdup(fname); char *tname = strdup(fname);
if (tname == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
int fd = open(fname, O_RDWR | O_CREAT, 0755); int fd = open(fname, O_RDWR | O_CREAT, 0755);
if (fd < 0) { if (fd < 0) {
@ -247,14 +247,14 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
pStore->appH = appH; pStore->appH = appH;
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
if (pStore->map == NULL) { if (pStore->map == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err; goto _err;
} }
return pStore; return pStore;
_err: _err:
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
tdFreeKVStore(pStore); tdFreeKVStore(pStore);
return NULL; return NULL;
} }
@ -273,7 +273,7 @@ static char *tdGetKVStoreSnapshotFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1; size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1;
char * fname = malloc(size); char * fname = malloc(size);
if (fname == NULL) { if (fname == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL; return NULL;
} }
sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX); sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX);
@ -284,7 +284,7 @@ static char *tdGetKVStoreNewFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1; size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1;
char * fname = malloc(size); char * fname = malloc(size);
if (fname == NULL) { if (fname == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL; return NULL;
} }
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX); sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);