refactor commit process
This commit is contained in:
parent
9e4fc91151
commit
be3915293e
|
@ -96,6 +96,11 @@ typedef struct {
|
|||
} STsdbBufPool;
|
||||
|
||||
// ------------------ tsdbMemTable.c
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
typedef struct {
|
||||
uint64_t uid;
|
||||
TSKEY keyFirst;
|
||||
|
@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
|||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
TSKEY* filterKeys, int nFilterKeys);
|
||||
|
||||
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||
if (pIter == NULL) return NULL;
|
||||
|
||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) return NULL;
|
||||
|
||||
return SL_GET_NODE_DATA(node);
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) return -1;
|
||||
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
// ------------------ tsdbFile.c
|
||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||
|
@ -421,25 +444,36 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
|||
#define helperType(h) (h)->type
|
||||
#define helperRepo(h) (h)->pRepo
|
||||
#define helperState(h) (h)->state
|
||||
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
||||
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
|
||||
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
|
||||
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
|
||||
return 1;
|
||||
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
#define REPO_ID(r) (r)->config.tsdbId
|
||||
|
|
|
@ -18,11 +18,6 @@
|
|||
|
||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
|
@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data);
|
|||
static void * tsdbCommitData(void *arg);
|
||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey,
|
||||
int maxRowsToRead, SDataCols *pCols);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
|
@ -248,6 +240,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||
TSKEY *filterKeys, int nFilterKeys) {
|
||||
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
int numOfRows = 0;
|
||||
TSKEY keyNext = 0;
|
||||
int filterIter = 0;
|
||||
|
||||
if (nFilterKeys != 0) { // for filter purpose
|
||||
ASSERT(filterKeys != NULL);
|
||||
keyNext = tsdbNextIterKey(pIter);
|
||||
if (keyNext < 0 || keyNext > maxKey) return numOfRows;
|
||||
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
|
||||
filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY));
|
||||
}
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) break;
|
||||
|
||||
keyNext = dataRowKey(row);
|
||||
if (keyNext < 0 || keyNext > maxKey) break;
|
||||
|
||||
bool keyFiltered = false;
|
||||
if (nFilterKeys != 0) {
|
||||
while (true) {
|
||||
if (filterIter >= nFilterKeys) break;
|
||||
if (keyNext == filterKeys[filterIter]) {
|
||||
keyFiltered = true;
|
||||
filterIter++;
|
||||
break;
|
||||
} else if (keyNext < filterKeys[filterIter]) {
|
||||
break;
|
||||
} else {
|
||||
filterIter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!keyFiltered) {
|
||||
if (pCols) {
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
}
|
||||
numOfRows++;
|
||||
}
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL);
|
||||
|
@ -475,19 +527,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
|
|||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
||||
}
|
||||
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter) {
|
||||
if (pIter == NULL) return -1;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter->pIter);
|
||||
if (node == NULL) return -1;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||
for (int i = 0; i < nIters; i++) {
|
||||
TSKEY nextKey = tsdbNextIterKey(iters + i);
|
||||
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
|
||||
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -500,7 +542,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
|
|||
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||
char * dataDir = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
|
@ -545,33 +586,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
if (pIter->pIter != NULL) {
|
||||
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
||||
|
||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||
int nLoop = 0;
|
||||
while (true) {
|
||||
int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
ASSERT(rowsRead >= 0);
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
nLoop++;
|
||||
|
||||
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
|
||||
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
|
||||
|
||||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
||||
ASSERT(rowsWritten != 0);
|
||||
if (rowsWritten < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(rowsWritten <= pDataCols->numOfRows);
|
||||
|
||||
tdPopDataColsPoints(pDataCols, rowsWritten);
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
|
||||
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pDataCols->numOfRows == 0);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
|
@ -666,35 +687,4 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
|||
}
|
||||
|
||||
free(iters);
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||
ASSERT(maxRowsToRead > 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
|
||||
int numOfRows = 0;
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) break;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
// TODO: deal with the error here
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
numOfRows++;
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
|
@ -23,18 +23,16 @@
|
|||
|
||||
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
||||
#define TSDB_KEY_COL_OFFSET 0
|
||||
#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock))
|
||||
|
||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
|
||||
bool isLast, bool isSuperBlock);
|
||||
static int compareKeyBlock(const void *arg1, const void *arg2);
|
||||
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
|
||||
static int compTSKEY(const void *key1, const void *key2);
|
||||
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
|
||||
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
|
||||
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
|
||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
|
||||
static int tsdbInitHelperFile(SRWHelper *pHelper);
|
||||
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
|
||||
|
@ -234,76 +232,190 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
|||
ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Write part of of points from pDataCols to file
|
||||
*
|
||||
* @return: number of points written to file successfully
|
||||
* -1 for failure
|
||||
*/
|
||||
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||
int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
|
||||
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
||||
ASSERT(pDataCols->numOfRows > 0);
|
||||
|
||||
SCompBlock compBlock;
|
||||
int rowsToWrite = 0;
|
||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||
SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]);
|
||||
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||
int blkIdx = 0;
|
||||
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||
SCompBlock compBlock = {0};
|
||||
|
||||
STsdbCfg *pCfg = &pHelper->pRepo->config;
|
||||
SSkipListIterator sIter = {0};
|
||||
|
||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
|
||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
|
||||
ASSERT(TABLE_TID(pCommitIter->pTable) == pHelper->tableInfo.tid);
|
||||
if (TABLE_UID(pCommitIter->pTable) != pIdx->uid) memset((void *)pIdx, 0, sizeof(*pIdx));
|
||||
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
|
||||
|
||||
// Load the SCompInfo part if neccessary
|
||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
|
||||
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
|
||||
while (true) {
|
||||
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
|
||||
if (keyFirst < 0 || keyFirst > maxKey) break; // iter over
|
||||
|
||||
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
|
||||
ASSERT(pHelper->hasOldLastBlock == false);
|
||||
rowsToWrite = pDataCols->numOfRows;
|
||||
SFile *pWFile = NULL;
|
||||
bool isLast = false;
|
||||
if (pIdx->offset <= 0 || keyFirst > pIdx->maxKey) {
|
||||
if (pIdx->hasLast) {
|
||||
ASSERT(pIdx->offset > 0);
|
||||
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
|
||||
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
tdResetDataCols(pDataCols);
|
||||
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey,
|
||||
defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, NULL, 0);
|
||||
ASSERT((rowsRead > 0) && (rowsRead == pDataCols->numOfRows));
|
||||
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
|
||||
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0)
|
||||
return -1;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks-1, rowsRead) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
|
||||
|
||||
if (rowsToWrite >= pCfg->minRowsPerFileBlock) {
|
||||
pWFile = &(pHelper->files.dataF);
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + rowsRead);
|
||||
SFile *pFile = NULL;
|
||||
bool isLast = false;
|
||||
if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
|
||||
pFile = &(pHelper->files.dataF);
|
||||
} else {
|
||||
isLast = true;
|
||||
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], &compBlock,
|
||||
isLast, true) < 0)
|
||||
return -1;
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
|
||||
}
|
||||
} else { // last block is not in .last file
|
||||
tdResetDataCols(pDataCols);
|
||||
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
|
||||
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
|
||||
|
||||
SFile *pFile = NULL;
|
||||
bool isLast = false;
|
||||
if (rowsRead >= pCfg->minRowsPerFileBlock) {
|
||||
pFile = &(pHelper->files.dataF);
|
||||
} else {
|
||||
isLast = true;
|
||||
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1;
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
|
||||
}
|
||||
blkIdx = pIdx->numOfBlocks;
|
||||
} else {
|
||||
isLast = true;
|
||||
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
SCompBlock *pCompBlock = taosbsearch(&keyFirst, (void *)blockAtIdx(pHelper, blkIdx), pIdx->numOfBlocks - blkIdx,
|
||||
sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
||||
ASSERT(pCompBlock != NULL);
|
||||
|
||||
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
|
||||
if (pCompBlock->last) {
|
||||
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
|
||||
} else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
|
||||
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks,
|
||||
sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
||||
int16_t colId = 0;
|
||||
sIter = *(pCommitIter->pIter);
|
||||
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
|
||||
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
|
||||
int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, maxKey, rows1, NULL,
|
||||
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
|
||||
if (rows2 == 0) { // all data are filtered
|
||||
*pCommitIter->pIter = sIter;
|
||||
} else {
|
||||
if (rows2 + rows1 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS &&
|
||||
!TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||
tdResetDataCols(pDataCols);
|
||||
int rows = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
|
||||
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
|
||||
ASSERT(rows == rows2 && pDataCols->numOfRows == rows);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0)
|
||||
return -1;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rows) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
// TODO: merge pHelper->pDataCols[0] with pCommitIter->pIter
|
||||
int round = 0;
|
||||
// int iter1 = 0;
|
||||
while (true) {
|
||||
tdResetDataCols(pDataCols);
|
||||
int rowsRead = 0;
|
||||
// tsdbTwoLeveIterMerge(pHelper->pDataCols[0], &iter1, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols);
|
||||
if (rowsRead == 0) break;
|
||||
SFile *pFile = NULL;
|
||||
bool isLast = false;
|
||||
if (rowsRead >= pCfg->minRowsPerFileBlock) {
|
||||
pFile = &(pHelper->files.dataF);
|
||||
} else {
|
||||
isLast = true;
|
||||
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
|
||||
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, &compBlock, isLast, true) < 0) return -1;
|
||||
if (round == 0) {
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks-1) < 0) return -1;
|
||||
}
|
||||
round++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
|
||||
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : pCompBlock[1].keyFirst - 1;
|
||||
if (keyFirst < pCompBlock->keyFirst) {
|
||||
while (true) {
|
||||
tdResetDataCols(pDataCols);
|
||||
int rowsRead = tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0);
|
||||
if (rowsRead == 0) break;
|
||||
|
||||
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
|
||||
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
|
||||
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
||||
if (rowsToWrite < 0) goto _err;
|
||||
} else { // Has key overlap
|
||||
ASSERT(rowsRead == pDataCols->numOfRows);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
|
||||
return -1;
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||
tblkIdx++;
|
||||
}
|
||||
} else {
|
||||
ASSERT(keyFirst <= pCompBlock->keyLast);
|
||||
int16_t colId = 0;
|
||||
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
|
||||
|
||||
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
|
||||
// Key overlap with the block, must merge with the block
|
||||
sIter = *(pCommitIter->pIter);
|
||||
int rows1 = pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows;
|
||||
int rows2 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, pCompBlock->keyLast, INT_MAX /*TODO*/, NULL,
|
||||
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
|
||||
int rows3 = 0;
|
||||
if (rows2 == 0) { // All filtered out
|
||||
*(pCommitIter->pIter) = sIter;
|
||||
} else {
|
||||
rows3 = tsdbLoadDataFromCache(pCommitIter->pTable, &sIter, keyLimit, INT_MAX /* TODO*/, NULL, NULL, 0) + rows2;
|
||||
ASSERT(rows3 >= rows2);
|
||||
|
||||
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
||||
if (rowsToWrite < 0) goto _err;
|
||||
} else { // Save as a super block in the middle
|
||||
rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1);
|
||||
ASSERT(rowsToWrite > 0);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0)
|
||||
goto _err;
|
||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
|
||||
if (rows1 >= rows2) {
|
||||
int rows = (rows1 >= rows3) ? rows3 : rows2;
|
||||
tdResetDataCols(pDataCols);
|
||||
int rowsRead =
|
||||
tsdbLoadDataFromCache(pCommitIter->pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
|
||||
pHelper->pDataCols[0]->cols[0].pData, pHelper->pDataCols[0]->numOfRows);
|
||||
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0)
|
||||
return -1;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
|
||||
int round = 0;
|
||||
while (true) {
|
||||
// TODO
|
||||
round++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rowsToWrite;
|
||||
|
||||
_err:
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
||||
|
@ -321,8 +433,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
if (pCompBlock->numOfSubBlocks > 1) {
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
||||
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0)
|
||||
return -1;
|
||||
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
||||
|
@ -497,11 +608,29 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
|||
|
||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||
if (pIdx->offset > 0) {
|
||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
||||
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
|
||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
|
||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname,
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
||||
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
|
||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
||||
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||
pHelper->files.headF.fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
|
||||
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
|
||||
pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pIdx->uid == pHelper->pCompInfo->uid);
|
||||
}
|
||||
|
||||
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
||||
|
@ -629,13 +758,14 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
|
||||
bool isLast, bool isSuperBlock) {
|
||||
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||
int64_t offset = 0;
|
||||
int rowsToWrite = pDataCols->numOfRows;
|
||||
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
|
||||
|
||||
offset = lseek(pFile->fd, 0, SEEK_END);
|
||||
|
@ -772,136 +902,6 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
||||
// TODO: set pHelper->hasOldBlock
|
||||
int rowsWritten = 0;
|
||||
SCompBlock compBlock = {0};
|
||||
STsdbCfg * pCfg = &pHelper->pRepo->config;
|
||||
|
||||
ASSERT(pDataCols->numOfRows > 0);
|
||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||
|
||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||
ASSERT(blkIdx < pIdx->numOfBlocks);
|
||||
|
||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
|
||||
ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
|
||||
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
||||
|
||||
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks - 1);
|
||||
int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
||||
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
||||
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pCfg->minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd) < 0) {
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
|
||||
goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
// Write
|
||||
SFile *pWFile = NULL;
|
||||
bool isLast = false;
|
||||
if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
|
||||
pWFile = &(pHelper->files.dataF);
|
||||
} else {
|
||||
isLast = true;
|
||||
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||
}
|
||||
if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock,
|
||||
isLast, true) < 0)
|
||||
goto _err;
|
||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pHelper->hasOldLastBlock);
|
||||
pHelper->hasOldLastBlock = false;
|
||||
} else {
|
||||
// Key must overlap with the block
|
||||
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
|
||||
TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
|
||||
|
||||
// rows1: number of rows must merge in this block
|
||||
int rows1 =
|
||||
tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
// rows2: max number of rows the block can have more
|
||||
int rows2 = pCfg->maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
|
||||
// rows3: number of rows between this block and the next block
|
||||
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
|
||||
|
||||
ASSERT(rows3 >= rows1);
|
||||
|
||||
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
((!blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd < 0)))) {
|
||||
rowsWritten = rows1;
|
||||
bool isLast = false;
|
||||
SFile *pFile = NULL;
|
||||
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) {
|
||||
isLast = true;
|
||||
pFile = &(pHelper->files.lastF);
|
||||
} else {
|
||||
pFile = &(pHelper->files.dataF);
|
||||
}
|
||||
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else { // Load-Merge-Write
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
||||
|
||||
rowsWritten = rows3;
|
||||
|
||||
int iter1 = 0; // iter over pHelper->pDataCols[0]
|
||||
int iter2 = 0; // iter over pDataCols
|
||||
int round = 0;
|
||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
||||
while (true) {
|
||||
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
|
||||
pDataCols, &iter2, rowsWritten, pCfg->maxRowsPerFileBlock * 4 / 5);
|
||||
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
||||
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||
goto _err;
|
||||
if (round == 0) {
|
||||
tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
|
||||
} else {
|
||||
tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
|
||||
}
|
||||
round++;
|
||||
blkIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rowsWritten;
|
||||
|
||||
_err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int compTSKEY(const void *key1, const void *key2) {
|
||||
if (*(TSKEY *)key1 > *(TSKEY *)key2) {
|
||||
return 1;
|
||||
} else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
|
||||
if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
|
||||
size_t tsize = esize + sizeof(SCompBlock) * 16;
|
||||
|
@ -1079,30 +1079,6 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
|||
return 0;
|
||||
}
|
||||
|
||||
// Get the number of rows in range [minKey, maxKey]
|
||||
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
|
||||
if (pDataCols->numOfRows == 0) return 0;
|
||||
|
||||
ASSERT(minKey <= maxKey);
|
||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||
TSKEY keyLast = dataColsKeyLast(pDataCols);
|
||||
ASSERT(keyFirst <= keyLast);
|
||||
|
||||
if (minKey > keyLast || maxKey < keyFirst) return 0;
|
||||
|
||||
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
||||
compTSKEY, TD_GE);
|
||||
ASSERT(ptr1 != NULL);
|
||||
|
||||
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
||||
compTSKEY, TD_LE);
|
||||
ASSERT(ptr2 != NULL);
|
||||
|
||||
if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;
|
||||
|
||||
return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
|
||||
}
|
||||
|
||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
||||
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
||||
pHelper->files.fid = -1;
|
||||
|
@ -1292,6 +1268,8 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
|||
// If only load timestamp column, no need to load SCompData part
|
||||
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
||||
|
||||
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||
|
||||
int dcol = 0;
|
||||
int ccol = 0;
|
||||
for (int i = 0; i < numOfColIds; i++) {
|
||||
|
|
Loading…
Reference in New Issue