fix some bug
This commit is contained in:
parent
dfc7c836fa
commit
57e93db149
|
@ -193,7 +193,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
|
|||
if (offset == 0) {
|
||||
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
|
||||
memcpy(POINTER_SHIFT(row, toffset), &tvalue, TYPE_BYTES[type]);
|
||||
memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]);
|
||||
} else {
|
||||
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
|
||||
}
|
||||
|
@ -227,7 +227,6 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
|||
|
||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows);
|
||||
void dataColSetOffset(SDataCol *pCol, int nEle);
|
||||
|
||||
bool isNEleNull(SDataCol *pCol, int nEle);
|
||||
|
@ -235,28 +234,20 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
|
|||
|
||||
// Get the data pointer from a column-wised data
|
||||
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) {
|
||||
switch (pCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
||||
break;
|
||||
|
||||
default:
|
||||
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
||||
break;
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
||||
} else {
|
||||
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
|
||||
ASSERT(rows > 0);
|
||||
|
||||
switch (pDataCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1));
|
||||
break;
|
||||
default:
|
||||
return TYPE_BYTES[pDataCol->type] * rows;
|
||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
||||
return pDataCol->dataOff[rows - 1] + varDataTLen(tdGetColDataOfRow(pDataCol, rows - 1));
|
||||
} else {
|
||||
return TYPE_BYTES[pDataCol->type] * rows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,11 +268,11 @@ typedef struct {
|
|||
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)]
|
||||
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
|
||||
#define dataColsTKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0)
|
||||
#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, 0)
|
||||
#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, 0)
|
||||
#define dataColsTKeyLast(pCols) \
|
||||
(((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1))
|
||||
#define dataColsKeyLast(pCols) \
|
||||
(((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
|
||||
(((pCols)->numOfRows == 0) ? TSDB_DATA_TIMESTAMP_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
|
||||
|
||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
||||
void tdResetDataCols(SDataCols *pCols);
|
||||
|
@ -289,7 +280,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
|||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||
void tdFreeDataCols(SDataCols *pCols);
|
||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
||||
|
||||
// ----------------- K-V data row structure
|
||||
|
|
|
@ -205,7 +205,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
|
|||
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
|
||||
|
||||
pDataCol->len = 0;
|
||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
||||
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
|
||||
pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
|
||||
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
|
||||
|
@ -218,60 +218,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
|
|||
}
|
||||
}
|
||||
|
||||
// value from timestamp should be TKEY here instead of TSKEY
|
||||
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
|
||||
ASSERT(pCol != NULL && value != NULL);
|
||||
|
||||
switch (pCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
// set offset
|
||||
pCol->dataOff[numOfRows] = pCol->len;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(value);
|
||||
break;
|
||||
default:
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
||||
pCol->len += pCol->bytes;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
|
||||
int pointsLeft = numOfRows - pointsToPop;
|
||||
|
||||
ASSERT(pointsLeft > 0);
|
||||
|
||||
if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
ASSERT(pCol->len > 0);
|
||||
VarDataOffsetT toffset = pCol->dataOff[pointsToPop];
|
||||
pCol->len = pCol->len - toffset;
|
||||
ASSERT(pCol->len > 0);
|
||||
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
|
||||
dataColSetOffset(pCol, pointsLeft);
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
// set offset
|
||||
pCol->dataOff[numOfRows] = pCol->len;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(value);
|
||||
} else {
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
|
||||
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
||||
pCol->len += pCol->bytes;
|
||||
}
|
||||
}
|
||||
|
||||
bool isNEleNull(SDataCol *pCol, int nEle) {
|
||||
switch (pCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
for (int i = 0; i < nEle; i++) {
|
||||
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
||||
}
|
||||
return true;
|
||||
default:
|
||||
for (int i = 0; i < nEle; i++) {
|
||||
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
||||
}
|
||||
return true;
|
||||
for (int i = 0; i < nEle; i++) {
|
||||
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void dataColSetNullAt(SDataCol *pCol, int index) {
|
||||
|
@ -393,7 +362,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize;
|
||||
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
|
||||
|
||||
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
|
||||
ASSERT(pDataCols->cols[i].dataOff != NULL);
|
||||
pRet->cols[i].dataOff =
|
||||
(int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf)));
|
||||
|
@ -403,7 +372,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
pRet->cols[i].len = pDataCols->cols[i].len;
|
||||
if (pDataCols->cols[i].len > 0) {
|
||||
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
|
||||
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
|
||||
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
|
||||
}
|
||||
}
|
||||
|
@ -463,21 +432,6 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
|
|||
pCols->numOfRows++;
|
||||
}
|
||||
|
||||
// Pop pointsToPop points from the SDataCols
|
||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
|
||||
int pointsLeft = pCols->numOfRows - pointsToPop;
|
||||
if (pointsLeft <= 0) {
|
||||
tdResetDataCols(pCols);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
|
||||
SDataCol *pCol = pCols->cols + iCol;
|
||||
dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
|
||||
}
|
||||
pCols->numOfRows = pointsLeft;
|
||||
}
|
||||
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
||||
|
@ -549,9 +503,9 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
|||
target->maxPoints);
|
||||
}
|
||||
}
|
||||
target->numOfRows++;
|
||||
}
|
||||
|
||||
target->numOfRows++;
|
||||
(*iter2)++;
|
||||
if (key1 == key2) (*iter1)++;
|
||||
}
|
||||
|
|
|
@ -105,8 +105,14 @@ int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
|||
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
|
||||
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||
} else {
|
||||
// TODO: may need to refact here
|
||||
int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
|
||||
if (isRowDelete) {
|
||||
if (TABLE_LASTKEY(pTable) == key) {
|
||||
// TODO: need to update table last key here (may from file)
|
||||
}
|
||||
} else {
|
||||
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
||||
}
|
||||
if ((!isRowDelete) && (TABLE_LASTKEY(pTable) < key)) TABLE_LASTKEY(pTable) = key;
|
||||
|
||||
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
||||
|
@ -301,7 +307,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
|||
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
|
||||
* 4. operations in pCols not exceeds its max capacity if pCols is given
|
||||
*
|
||||
* The function try to move as mush as possible.
|
||||
* The function tries to procceed AS MUSH AS POSSIBLE.
|
||||
*/
|
||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
|
||||
|
@ -321,12 +327,13 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL || dataRowKey(row) > maxKey) {
|
||||
rowKey = INT64_MAX;
|
||||
isRowDel = false;
|
||||
} else {
|
||||
rowKey = dataRowKey(row);
|
||||
isRowDel = dataRowDeleted(row);
|
||||
}
|
||||
|
||||
if (nFilterKeys == 0 || filterIter >= nFilterKeys) {
|
||||
if (filterIter >= nFilterKeys) {
|
||||
fKey = INT64_MAX;
|
||||
} else {
|
||||
fKey = tdGetKey(filterKeys[filterIter]);
|
||||
|
@ -362,6 +369,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL || dataRowKey(row) > maxKey) {
|
||||
rowKey = INT64_MAX;
|
||||
isRowDel = false;
|
||||
} else {
|
||||
rowKey = dataRowKey(row);
|
||||
isRowDel = dataRowDeleted(row);
|
||||
|
@ -391,6 +399,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL || dataRowKey(row) > maxKey) {
|
||||
rowKey = INT64_MAX;
|
||||
isRowDel = false;
|
||||
} else {
|
||||
rowKey = dataRowKey(row);
|
||||
isRowDel = dataRowDeleted(row);
|
||||
|
|
|
@ -1507,6 +1507,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
if (pDataCols->numOfRows + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
|
||||
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
|
||||
pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, pCompBlock->keyFirst);
|
||||
pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, pCompBlock->keyLast);
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, pMergeInfo) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
|
@ -1553,6 +1555,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
SDataCols * pDataCols0 = pHelper->pDataCols[0];
|
||||
SMergeInfo mergeInfo = {0};
|
||||
SMergeInfo *pMergeInfo = &mergeInfo;
|
||||
SCompBlock oBlock = {0};
|
||||
|
||||
SSkipListIterator slIter = {0};
|
||||
|
||||
|
@ -1562,14 +1565,14 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
||||
ASSERT(pCompBlock != NULL);
|
||||
int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock));
|
||||
oBlock = *pCompBlock;
|
||||
|
||||
ASSERT((!TSDB_IS_LAST_BLOCK(pCompBlock)) || (tblkIdx == pIdx->numOfBlocks - 1));
|
||||
ASSERT((!TSDB_IS_LAST_BLOCK(&oBlock)) || (tblkIdx == pIdx->numOfBlocks - 1));
|
||||
|
||||
if ((!TSDB_IS_LAST_BLOCK(pCompBlock)) && keyFirst < pCompBlock->keyFirst) {
|
||||
// Loop to write data until pCompBlock->keyFirst-1
|
||||
if ((!TSDB_IS_LAST_BLOCK(&oBlock)) && keyFirst < pCompBlock->keyFirst) {
|
||||
while (true) {
|
||||
tdResetDataCols(pDataCols);
|
||||
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, pCompBlock->keyLast - 1, defaultRowsInBlock, pDataCols, NULL, 0,
|
||||
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, oBlock.keyFirst, defaultRowsInBlock, pDataCols, NULL, 0,
|
||||
pCfg->update, pMergeInfo);
|
||||
ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows);
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
|
@ -1582,7 +1585,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
|
||||
} else {
|
||||
int16_t colId = 0;
|
||||
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
|
||||
if (tsdbLoadBlockDataCols(pHelper, &oBlock, NULL, &colId, 1) < 0) return -1;
|
||||
|
||||
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (blockAtIdx(pHelper, tblkIdx + 1)->keyFirst - 1);
|
||||
|
||||
|
@ -1595,19 +1598,19 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
ASSERT(pMergeInfo->rowsDeleteFailed >= 0);
|
||||
*(pCommitIter->pIter) = slIter;
|
||||
tblkIdx++;
|
||||
} else if (pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) {
|
||||
} else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) {
|
||||
// Delete the block and do some stuff
|
||||
ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
|
||||
if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1;
|
||||
*pCommitIter->pIter = slIter;
|
||||
if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||
} else if (tsdbCheckAddSubBlockCond(pHelper, pCompBlock, pMergeInfo, pDataCols->maxPoints)) {
|
||||
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||
} else if (tsdbCheckAddSubBlockCond(pHelper, &oBlock, pMergeInfo, pDataCols->maxPoints)) {
|
||||
// Append as a sub-block of the searched block
|
||||
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, INT_MAX, pDataCols, pDataCols0->cols[0].pData,
|
||||
pDataCols0->numOfRows, pCfg->update, pMergeInfo);
|
||||
ASSERT(memcmp(pCommitIter->pIter, &slIter, sizeof(slIter)) == 0);
|
||||
if (tsdbWriteBlockToFile(pHelper, pCompBlock->last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols,
|
||||
&compBlock, pCompBlock->last, false) < 0) {
|
||||
if (tsdbWriteBlockToFile(pHelper, oBlock.last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols,
|
||||
&compBlock, oBlock.last, false) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, pMergeInfo) < 0) {
|
||||
|
@ -1616,7 +1619,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
tblkIdx++;
|
||||
} else {
|
||||
// load the block data, merge with the memory data
|
||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
if (tsdbLoadBlockData(pHelper, &oBlock, NULL) < 0) return -1;
|
||||
int round = 0;
|
||||
int dIter = 0;
|
||||
while (true) {
|
||||
|
@ -1631,7 +1634,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
}
|
||||
|
||||
if (round == 0) {
|
||||
if (pCompBlock->last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||
|
|
|
@ -28,9 +28,9 @@ extern "C" {
|
|||
#define SKIP_LIST_RECORD_PERFORMANCE 0
|
||||
|
||||
// For key property setting
|
||||
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists
|
||||
#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key
|
||||
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert
|
||||
#define SL_ALLOW_DUP_KEY (uint8_t)0x0 // Allow duplicate key exists (for tag index usage)
|
||||
#define SL_DISCARD_DUP_KEY (uint8_t)0x1 // Discard duplicate key (for data update=0 case)
|
||||
#define SL_UPDATE_DUP_KEY (uint8_t)0x2 // Update duplicate key by remove/insert (for data update=1 case)
|
||||
// For thread safety setting
|
||||
#define SL_THREAD_SAFE (uint8_t)0x4
|
||||
|
||||
|
|
Loading…
Reference in New Issue