commit
b03c73c9cc
|
@ -96,6 +96,11 @@ typedef struct {
|
||||||
} STsdbBufPool;
|
} STsdbBufPool;
|
||||||
|
|
||||||
// ------------------ tsdbMemTable.c
|
// ------------------ tsdbMemTable.c
|
||||||
|
typedef struct {
|
||||||
|
STable * pTable;
|
||||||
|
SSkipListIterator *pIter;
|
||||||
|
} SCommitIter;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
TSKEY keyFirst;
|
TSKEY keyFirst;
|
||||||
|
@ -206,10 +211,10 @@ typedef struct {
|
||||||
int64_t offset : 63;
|
int64_t offset : 63;
|
||||||
int32_t algorithm : 8;
|
int32_t algorithm : 8;
|
||||||
int32_t numOfRows : 24;
|
int32_t numOfRows : 24;
|
||||||
int32_t sversion;
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols
|
||||||
int16_t numOfSubBlocks;
|
int16_t numOfSubBlocks;
|
||||||
int16_t numOfCols;
|
int16_t numOfCols; // not including timestamp column
|
||||||
TSKEY keyFirst;
|
TSKEY keyFirst;
|
||||||
TSKEY keyLast;
|
TSKEY keyLast;
|
||||||
} SCompBlock;
|
} SCompBlock;
|
||||||
|
@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||||
|
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||||
|
TSKEY* filterKeys, int nFilterKeys);
|
||||||
|
|
||||||
|
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||||
|
if (pIter == NULL) return NULL;
|
||||||
|
|
||||||
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
|
if (node == NULL) return NULL;
|
||||||
|
|
||||||
|
return SL_GET_NODE_DATA(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||||
|
SDataRow row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL) return -1;
|
||||||
|
|
||||||
|
return dataRowKey(row);
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------ tsdbFile.c
|
// ------------------ tsdbFile.c
|
||||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||||
|
@ -421,6 +444,7 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
||||||
#define helperType(h) (h)->type
|
#define helperType(h) (h)->type
|
||||||
#define helperRepo(h) (h)->pRepo
|
#define helperRepo(h) (h)->pRepo
|
||||||
#define helperState(h) (h)->state
|
#define helperState(h) (h)->state
|
||||||
|
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
||||||
|
|
||||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||||
|
@ -429,7 +453,7 @@ void tsdbResetHelper(SRWHelper* pHelper);
|
||||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||||
int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols);
|
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
|
||||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||||
|
@ -441,6 +465,16 @@ int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInf
|
||||||
int numOfColIds);
|
int numOfColIds);
|
||||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||||
|
|
||||||
|
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
|
||||||
|
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
|
||||||
|
return 1;
|
||||||
|
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------ tsdbMain.c
|
// ------------------ tsdbMain.c
|
||||||
#define REPO_ID(r) (r)->config.tsdbId
|
#define REPO_ID(r) (r)->config.tsdbId
|
||||||
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||||
|
|
|
@ -18,11 +18,6 @@
|
||||||
|
|
||||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
STable * pTable;
|
|
||||||
SSkipListIterator *pIter;
|
|
||||||
} SCommitIter;
|
|
||||||
|
|
||||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||||
|
|
||||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||||
|
@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data);
|
||||||
static void * tsdbCommitData(void *arg);
|
static void * tsdbCommitData(void *arg);
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
|
|
||||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo);
|
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables);
|
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey,
|
|
||||||
int maxRowsToRead, SDataCols *pCols);
|
|
||||||
|
|
||||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
|
@ -252,6 +244,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||||
|
TSKEY *filterKeys, int nFilterKeys) {
|
||||||
|
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
||||||
|
if (pIter == NULL) return 0;
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
int numOfRows = 0;
|
||||||
|
TSKEY keyNext = 0;
|
||||||
|
int filterIter = 0;
|
||||||
|
|
||||||
|
if (nFilterKeys != 0) { // for filter purpose
|
||||||
|
ASSERT(filterKeys != NULL);
|
||||||
|
keyNext = tsdbNextIterKey(pIter);
|
||||||
|
if (keyNext < 0 || keyNext > maxKey) return numOfRows;
|
||||||
|
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
|
||||||
|
filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
if (numOfRows >= maxRowsToRead) break;
|
||||||
|
|
||||||
|
SDataRow row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL) break;
|
||||||
|
|
||||||
|
keyNext = dataRowKey(row);
|
||||||
|
if (keyNext < 0 || keyNext > maxKey) break;
|
||||||
|
|
||||||
|
bool keyFiltered = false;
|
||||||
|
if (nFilterKeys != 0) {
|
||||||
|
while (true) {
|
||||||
|
if (filterIter >= nFilterKeys) break;
|
||||||
|
if (keyNext == filterKeys[filterIter]) {
|
||||||
|
keyFiltered = true;
|
||||||
|
filterIter++;
|
||||||
|
break;
|
||||||
|
} else if (keyNext < filterKeys[filterIter]) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
filterIter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!keyFiltered) {
|
||||||
|
if (pCols) {
|
||||||
|
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||||
|
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
|
||||||
|
if (pSchema == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||||
|
}
|
||||||
|
numOfRows++;
|
||||||
|
}
|
||||||
|
} while (tSkipListIterNext(pIter));
|
||||||
|
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------- LOCAL FUNCTIONS ----------------
|
// ---------------- LOCAL FUNCTIONS ----------------
|
||||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
||||||
ASSERT(pRepo != NULL);
|
ASSERT(pRepo != NULL);
|
||||||
|
@ -378,7 +430,7 @@ static void *tsdbCommitData(void *arg) {
|
||||||
|
|
||||||
// Create the iterator to read from cache
|
// Create the iterator to read from cache
|
||||||
if (pMem->numOfRows > 0) {
|
if (pMem->numOfRows > 0) {
|
||||||
iters = tsdbCreateTableIters(pRepo);
|
iters = tsdbCreateCommitIters(pRepo);
|
||||||
if (iters == NULL) {
|
if (iters == NULL) {
|
||||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -418,7 +470,7 @@ static void *tsdbCommitData(void *arg) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tdFreeDataCols(pDataCols);
|
tdFreeDataCols(pDataCols);
|
||||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||||
tsdbDestroyHelper(&whelper);
|
tsdbDestroyHelper(&whelper);
|
||||||
tsdbEndCommit(pRepo);
|
tsdbEndCommit(pRepo);
|
||||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||||
|
@ -479,19 +531,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
|
||||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter) {
|
|
||||||
if (pIter == NULL) return -1;
|
|
||||||
|
|
||||||
SSkipListNode *node = tSkipListIterGet(pIter->pIter);
|
|
||||||
if (node == NULL) return -1;
|
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
|
||||||
return dataRowKey(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||||
for (int i = 0; i < nIters; i++) {
|
for (int i = 0; i < nIters; i++) {
|
||||||
TSKEY nextKey = tsdbNextIterKey(iters + i);
|
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
|
||||||
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
|
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -504,7 +546,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
|
||||||
|
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
char * dataDir = NULL;
|
char * dataDir = NULL;
|
||||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
SFileGroup *pGroup = NULL;
|
SFileGroup *pGroup = NULL;
|
||||||
|
@ -549,33 +590,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
if (pIter->pIter != NULL) {
|
if (pIter->pIter != NULL) {
|
||||||
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
||||||
|
|
||||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
|
||||||
int nLoop = 0;
|
|
||||||
while (true) {
|
|
||||||
int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols);
|
|
||||||
ASSERT(rowsRead >= 0);
|
|
||||||
if (pDataCols->numOfRows == 0) break;
|
|
||||||
nLoop++;
|
|
||||||
|
|
||||||
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
|
|
||||||
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
|
|
||||||
|
|
||||||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
|
||||||
ASSERT(rowsWritten != 0);
|
|
||||||
if (rowsWritten < 0) {
|
|
||||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||||
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
ASSERT(rowsWritten <= pDataCols->numOfRows);
|
|
||||||
|
|
||||||
tdPopDataColsPoints(pDataCols, rowsWritten);
|
|
||||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pDataCols->numOfRows == 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||||
|
@ -615,7 +636,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
||||||
STsdbCfg * pCfg = &(pRepo->config);
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
SMemTable *pMem = pRepo->imem;
|
SMemTable *pMem = pRepo->imem;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
@ -645,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tSkipListIterNext(iters[i].pIter)) {
|
tSkipListIterNext(iters[i].pIter);
|
||||||
terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return iters;
|
return iters;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
||||||
if (iters == NULL) return;
|
if (iters == NULL) return;
|
||||||
|
|
||||||
for (int i = 1; i < maxTables; i++) {
|
for (int i = 1; i < maxTables; i++) {
|
||||||
|
@ -671,34 +689,3 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
||||||
|
|
||||||
free(iters);
|
free(iters);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
|
||||||
ASSERT(maxRowsToRead > 0);
|
|
||||||
if (pIter == NULL) return 0;
|
|
||||||
STSchema *pSchema = NULL;
|
|
||||||
|
|
||||||
int numOfRows = 0;
|
|
||||||
|
|
||||||
do {
|
|
||||||
if (numOfRows >= maxRowsToRead) break;
|
|
||||||
|
|
||||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
|
||||||
if (node == NULL) break;
|
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
|
||||||
if (dataRowKey(row) > maxKey) break;
|
|
||||||
|
|
||||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
|
||||||
pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
|
|
||||||
if (pSchema == NULL) {
|
|
||||||
// TODO: deal with the error here
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
|
||||||
numOfRows++;
|
|
||||||
} while (tSkipListIterNext(pIter));
|
|
||||||
|
|
||||||
return numOfRows;
|
|
||||||
}
|
|
|
@ -727,7 +727,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
||||||
|
|
||||||
T_REF_INC(pTable);
|
T_REF_INC(pTable);
|
||||||
|
|
||||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||||
TABLE_UID(pTable));
|
TABLE_UID(pTable));
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
|
@ -740,7 +740,7 @@ _err:
|
||||||
static void tsdbFreeTable(STable *pTable) {
|
static void tsdbFreeTable(STable *pTable) {
|
||||||
if (pTable) {
|
if (pTable) {
|
||||||
if (pTable->name != NULL)
|
if (pTable->name != NULL)
|
||||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
tsdbTrace("table %s tid %d uid %" PRIu64 " is freed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||||
TABLE_UID(pTable));
|
TABLE_UID(pTable));
|
||||||
tfree(TABLE_NAME(pTable));
|
tfree(TABLE_NAME(pTable));
|
||||||
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||||
|
|
|
@ -22,18 +22,17 @@
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
|
|
||||||
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
||||||
|
#define TSDB_KEY_COL_OFFSET 0
|
||||||
|
#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock))
|
||||||
|
|
||||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
||||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
|
||||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
|
bool isLast, bool isSuperBlock);
|
||||||
static int compareKeyBlock(const void *arg1, const void *arg2);
|
static int compareKeyBlock(const void *arg1, const void *arg2);
|
||||||
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
|
|
||||||
static int compTSKEY(const void *key1, const void *key2);
|
|
||||||
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
|
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
|
||||||
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||||
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
|
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
|
||||||
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
|
||||||
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
|
|
||||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
|
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
|
||||||
static int tsdbInitHelperFile(SRWHelper *pHelper);
|
static int tsdbInitHelperFile(SRWHelper *pHelper);
|
||||||
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
|
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
|
||||||
|
@ -52,9 +51,15 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
|
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
|
||||||
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
|
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
|
||||||
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
|
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
|
||||||
|
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey);
|
||||||
static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
|
static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
|
||||||
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
|
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
|
||||||
SDataCol *pDataCol);
|
SDataCol *pDataCol);
|
||||||
|
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock);
|
||||||
|
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
|
||||||
|
int *blkIdx);
|
||||||
|
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||||
|
TSKEY maxKey, int maxRows);
|
||||||
|
|
||||||
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
||||||
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||||
|
@ -225,84 +230,41 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
||||||
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
|
||||||
if (pIdx->offset > 0 && pIdx->hasLast) {
|
if (pIdx->offset > 0) {
|
||||||
pHelper->hasOldLastBlock = true;
|
if (pIdx->uid != TABLE_UID(pTable)) {
|
||||||
|
memset((void *)pIdx, 0, sizeof(SCompIdx));
|
||||||
|
} else {
|
||||||
|
if (pIdx->hasLast) pHelper->hasOldLastBlock = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
|
helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
|
||||||
ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
|
ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
|
||||||
* Write part of of points from pDataCols to file
|
|
||||||
*
|
|
||||||
* @return: number of points written to file successfully
|
|
||||||
* -1 for failure
|
|
||||||
*/
|
|
||||||
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
|
||||||
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
||||||
ASSERT(pDataCols->numOfRows > 0);
|
|
||||||
|
|
||||||
SCompBlock compBlock;
|
SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]);
|
||||||
int rowsToWrite = 0;
|
int blkIdx = 0;
|
||||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
|
||||||
|
|
||||||
STsdbCfg *pCfg = &pHelper->pRepo->config;
|
ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable));
|
||||||
|
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
|
||||||
|
|
||||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
|
while (true) {
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
|
ASSERT(blkIdx <= pIdx->numOfBlocks);
|
||||||
|
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
|
||||||
|
if (keyFirst < 0 || keyFirst > maxKey) break; // iter over
|
||||||
|
|
||||||
// Load the SCompInfo part if neccessary
|
if (pIdx->len <= 0 || keyFirst > pIdx->maxKey) {
|
||||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
|
if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1;
|
||||||
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
|
blkIdx = pIdx->numOfBlocks;
|
||||||
|
|
||||||
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
|
|
||||||
ASSERT(pHelper->hasOldLastBlock == false);
|
|
||||||
rowsToWrite = pDataCols->numOfRows;
|
|
||||||
SFile *pWFile = NULL;
|
|
||||||
bool isLast = false;
|
|
||||||
|
|
||||||
if (rowsToWrite >= pCfg->minRowsPerFileBlock) {
|
|
||||||
pWFile = &(pHelper->files.dataF);
|
|
||||||
} else {
|
} else {
|
||||||
isLast = true;
|
if (tsdbProcessMergeCommit(pHelper, pCommitIter, pDataCols, maxKey, &blkIdx) < 0) return -1;
|
||||||
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
|
|
||||||
|
|
||||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
|
|
||||||
} else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
|
|
||||||
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks,
|
|
||||||
sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
|
||||||
|
|
||||||
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
|
|
||||||
|
|
||||||
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
|
|
||||||
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
|
|
||||||
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
|
||||||
if (rowsToWrite < 0) goto _err;
|
|
||||||
} else { // Has key overlap
|
|
||||||
|
|
||||||
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
|
|
||||||
// Key overlap with the block, must merge with the block
|
|
||||||
|
|
||||||
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
|
||||||
if (rowsToWrite < 0) goto _err;
|
|
||||||
} else { // Save as a super block in the middle
|
|
||||||
rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1);
|
|
||||||
ASSERT(rowsToWrite > 0);
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0)
|
|
||||||
goto _err;
|
|
||||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rowsToWrite;
|
return 0;
|
||||||
|
|
||||||
_err:
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
||||||
|
@ -310,30 +272,43 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
||||||
|
|
||||||
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
||||||
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
SCompBlock compBlock;
|
SCompBlock compBlock = {0};
|
||||||
if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
|
if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) {
|
||||||
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
|
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
|
||||||
|
|
||||||
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
|
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
|
||||||
ASSERT(pCompBlock->last);
|
ASSERT(pCompBlock->last);
|
||||||
|
|
||||||
if (pCompBlock->numOfSubBlocks > 1) {
|
if (pCompBlock->numOfSubBlocks > 1) {
|
||||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||||
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
|
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||||
pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname,
|
||||||
if (pCompBlock->offset < 0) return -1;
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
|
||||||
|
if (pCompBlock->offset < 0) {
|
||||||
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) {
|
||||||
|
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
|
||||||
|
pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pHelper->hasOldLastBlock = false;
|
pHelper->hasOldLastBlock = false;
|
||||||
}
|
}
|
||||||
|
@ -365,10 +340,12 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (pIdx->len > 0) {
|
||||||
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
|
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
|
||||||
pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
|
pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
|
||||||
pHelper->pCompInfo->checksum = 0;
|
pHelper->pCompInfo->checksum = 0;
|
||||||
ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
|
ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) &&
|
||||||
|
(pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
|
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
|
||||||
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||||
if (offset < 0) {
|
if (offset < 0) {
|
||||||
|
@ -388,6 +365,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -397,7 +375,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
|
|
||||||
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
||||||
off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
||||||
if (offset < 0) return -1;
|
if (offset < 0) {
|
||||||
|
tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SFile *pFile = &(pHelper->files.nHeadF);
|
SFile *pFile = &(pHelper->files.nHeadF);
|
||||||
pFile->info.offset = offset;
|
pFile->info.offset = offset;
|
||||||
|
@ -409,6 +392,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
|
int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
|
||||||
if (tsizeof(pHelper->pBuffer) - drift < 128) {
|
if (tsizeof(pHelper->pBuffer) - drift < 128) {
|
||||||
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
|
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
|
||||||
|
if (pHelper->pBuffer == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
buf = POINTER_SHIFT(pHelper->pBuffer, drift);
|
buf = POINTER_SHIFT(pHelper->pBuffer, drift);
|
||||||
taosEncodeVariantU32(&buf, i);
|
taosEncodeVariantU32(&buf, i);
|
||||||
|
@ -419,7 +406,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
|
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
|
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
|
||||||
|
|
||||||
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
|
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) {
|
||||||
|
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize,
|
||||||
|
pHelper->files.nHeadF.fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
pFile->info.len = tsize;
|
pFile->info.len = tsize;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -496,11 +488,29 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
|
|
||||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||||
if (pIdx->offset > 0) {
|
if (pIdx->offset > 0) {
|
||||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
|
||||||
|
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
||||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
|
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||||
|
pHelper->files.headF.fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
|
||||||
|
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
|
||||||
|
pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
|
||||||
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pIdx->uid == pHelper->pCompInfo->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
||||||
|
@ -628,13 +638,14 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
|
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock,
|
||||||
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
|
bool isLast, bool isSuperBlock) {
|
||||||
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
|
int rowsToWrite = pDataCols->numOfRows;
|
||||||
|
|
||||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||||
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
|
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
|
||||||
|
|
||||||
offset = lseek(pFile->fd, 0, SEEK_END);
|
offset = lseek(pFile->fd, 0, SEEK_END);
|
||||||
|
@ -646,7 +657,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
}
|
}
|
||||||
|
|
||||||
int nColsNotAllNull = 0;
|
int nColsNotAllNull = 0;
|
||||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
|
||||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||||
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
|
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
|
||||||
|
|
||||||
|
@ -658,7 +669,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
|
|
||||||
pCompCol->colId = pDataCol->colId;
|
pCompCol->colId = pDataCol->colId;
|
||||||
pCompCol->type = pDataCol->type;
|
pCompCol->type = pDataCol->type;
|
||||||
if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) {
|
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
|
||||||
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
|
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
|
||||||
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
|
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
|
||||||
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
|
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
|
||||||
|
@ -666,24 +677,24 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
nColsNotAllNull++;
|
nColsNotAllNull++;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
|
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
|
||||||
|
|
||||||
// Compress the data if neccessary
|
// Compress the data if neccessary
|
||||||
int tcol = 0;
|
int tcol = 0;
|
||||||
int32_t toffset = 0;
|
int32_t toffset = 0;
|
||||||
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
|
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
|
||||||
int32_t lsize = tsize;
|
int32_t lsize = tsize;
|
||||||
|
int32_t keyLen = 0;
|
||||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
||||||
if (tcol >= nColsNotAllNull) break;
|
if (tcol >= nColsNotAllNull) break;
|
||||||
|
|
||||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||||
SCompCol *pCompCol = pCompData->cols + tcol;
|
SCompCol *pCompCol = pCompData->cols + tcol;
|
||||||
|
|
||||||
if (pDataCol->colId != pCompCol->colId) continue;
|
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
|
||||||
void *tptr = (void *)((char *)pCompData + lsize);
|
void *tptr = POINTER_SHIFT(pCompData, lsize);
|
||||||
|
|
||||||
pCompCol->offset = toffset;
|
|
||||||
|
|
||||||
|
int32_t flen = 0; // final length
|
||||||
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
|
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
|
||||||
|
|
||||||
if (pCfg->compression) {
|
if (pCfg->compression) {
|
||||||
|
@ -695,22 +706,29 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
||||||
tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
|
||||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
|
||||||
} else {
|
} else {
|
||||||
pCompCol->len = tlen;
|
flen = tlen;
|
||||||
memcpy(tptr, pDataCol->pData, pCompCol->len);
|
memcpy(tptr, pDataCol->pData, flen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add checksum
|
// Add checksum
|
||||||
ASSERT(pCompCol->len > 0);
|
ASSERT(flen > 0);
|
||||||
pCompCol->len += sizeof(TSCKSUM);
|
flen += sizeof(TSCKSUM);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);
|
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
|
||||||
|
|
||||||
toffset += pCompCol->len;
|
if (ncol != 0) {
|
||||||
lsize += pCompCol->len;
|
pCompCol->offset = toffset;
|
||||||
|
pCompCol->len = flen;
|
||||||
tcol++;
|
tcol++;
|
||||||
|
} else {
|
||||||
|
keyLen = flen;
|
||||||
|
}
|
||||||
|
|
||||||
|
toffset += flen;
|
||||||
|
lsize += flen;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCompData->delimiter = TSDB_FILE_DELIMITER;
|
pCompData->delimiter = TSDB_FILE_DELIMITER;
|
||||||
|
@ -732,14 +750,14 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
||||||
pCompBlock->offset = offset;
|
pCompBlock->offset = offset;
|
||||||
pCompBlock->algorithm = pCfg->compression;
|
pCompBlock->algorithm = pCfg->compression;
|
||||||
pCompBlock->numOfRows = rowsToWrite;
|
pCompBlock->numOfRows = rowsToWrite;
|
||||||
pCompBlock->sversion = pHelper->tableInfo.sversion;
|
pCompBlock->len = lsize;
|
||||||
pCompBlock->len = (int32_t)lsize;
|
pCompBlock->keyLen = keyLen;
|
||||||
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
|
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
|
||||||
pCompBlock->numOfCols = nColsNotAllNull;
|
pCompBlock->numOfCols = nColsNotAllNull;
|
||||||
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
||||||
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
|
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
|
||||||
|
|
||||||
tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
|
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
|
||||||
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
|
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
|
||||||
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset),
|
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset),
|
||||||
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
|
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
|
||||||
|
@ -764,136 +782,6 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
|
||||||
// TODO: set pHelper->hasOldBlock
|
|
||||||
int rowsWritten = 0;
|
|
||||||
SCompBlock compBlock = {0};
|
|
||||||
STsdbCfg * pCfg = &pHelper->pRepo->config;
|
|
||||||
|
|
||||||
ASSERT(pDataCols->numOfRows > 0);
|
|
||||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
|
||||||
ASSERT(blkIdx < pIdx->numOfBlocks);
|
|
||||||
|
|
||||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
|
||||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
|
|
||||||
ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
|
|
||||||
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
|
||||||
|
|
||||||
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
|
||||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks - 1);
|
|
||||||
int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
|
||||||
|
|
||||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
|
||||||
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
|
||||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pCfg->minRowsPerFileBlock) &&
|
|
||||||
(pHelper->files.nLastF.fd) < 0) {
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
|
|
||||||
goto _err;
|
|
||||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
|
||||||
} else {
|
|
||||||
// Load
|
|
||||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
|
||||||
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
|
|
||||||
// Merge
|
|
||||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
|
||||||
// Write
|
|
||||||
SFile *pWFile = NULL;
|
|
||||||
bool isLast = false;
|
|
||||||
if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
|
|
||||||
pWFile = &(pHelper->files.dataF);
|
|
||||||
} else {
|
|
||||||
isLast = true;
|
|
||||||
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
|
||||||
}
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock,
|
|
||||||
isLast, true) < 0)
|
|
||||||
goto _err;
|
|
||||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pHelper->hasOldLastBlock);
|
|
||||||
pHelper->hasOldLastBlock = false;
|
|
||||||
} else {
|
|
||||||
// Key must overlap with the block
|
|
||||||
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
|
|
||||||
|
|
||||||
TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
|
|
||||||
|
|
||||||
// rows1: number of rows must merge in this block
|
|
||||||
int rows1 =
|
|
||||||
tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
|
|
||||||
// rows2: max number of rows the block can have more
|
|
||||||
int rows2 = pCfg->maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
|
|
||||||
// rows3: number of rows between this block and the next block
|
|
||||||
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
|
|
||||||
|
|
||||||
ASSERT(rows3 >= rows1);
|
|
||||||
|
|
||||||
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
|
||||||
((!blockAtIdx(pHelper, blkIdx)->last) ||
|
|
||||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock) &&
|
|
||||||
(pHelper->files.nLastF.fd < 0)))) {
|
|
||||||
rowsWritten = rows1;
|
|
||||||
bool isLast = false;
|
|
||||||
SFile *pFile = NULL;
|
|
||||||
|
|
||||||
if (blockAtIdx(pHelper, blkIdx)->last) {
|
|
||||||
isLast = true;
|
|
||||||
pFile = &(pHelper->files.lastF);
|
|
||||||
} else {
|
|
||||||
pFile = &(pHelper->files.dataF);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
|
|
||||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
|
||||||
} else { // Load-Merge-Write
|
|
||||||
// Load
|
|
||||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
|
||||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
|
||||||
|
|
||||||
rowsWritten = rows3;
|
|
||||||
|
|
||||||
int iter1 = 0; // iter over pHelper->pDataCols[0]
|
|
||||||
int iter2 = 0; // iter over pDataCols
|
|
||||||
int round = 0;
|
|
||||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
|
||||||
while (true) {
|
|
||||||
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
|
||||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
|
|
||||||
pDataCols, &iter2, rowsWritten, pCfg->maxRowsPerFileBlock * 4 / 5);
|
|
||||||
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
|
||||||
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
|
||||||
goto _err;
|
|
||||||
if (round == 0) {
|
|
||||||
tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
|
|
||||||
} else {
|
|
||||||
tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
|
|
||||||
}
|
|
||||||
round++;
|
|
||||||
blkIdx++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rowsWritten;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int compTSKEY(const void *key1, const void *key2) {
|
|
||||||
if (*(TSKEY *)key1 > *(TSKEY *)key2) {
|
|
||||||
return 1;
|
|
||||||
} else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
|
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
|
||||||
if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
|
if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
|
||||||
size_t tsize = esize + sizeof(SCompBlock) * 16;
|
size_t tsize = esize + sizeof(SCompBlock) * 16;
|
||||||
|
@ -911,7 +799,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
ASSERT(pCompBlock->numOfSubBlocks == 1);
|
ASSERT(pCompBlock->numOfSubBlocks == 1);
|
||||||
|
|
||||||
// Adjust memory if no more room
|
// Adjust memory if no more room
|
||||||
if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM);
|
if (pIdx->len == 0) pIdx->len = sizeof(SCompInfo) + sizeof(TSCKSUM);
|
||||||
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
|
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
|
||||||
|
|
||||||
// Change the offset
|
// Change the offset
|
||||||
|
@ -925,22 +813,22 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
if (tsize > 0) {
|
if (tsize > 0) {
|
||||||
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
|
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
|
||||||
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
|
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
|
||||||
memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
|
memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
|
||||||
(void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
|
POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
|
||||||
}
|
}
|
||||||
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
|
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
|
||||||
|
|
||||||
pIdx->numOfBlocks++;
|
pIdx->numOfBlocks++;
|
||||||
pIdx->len += sizeof(SCompBlock);
|
pIdx->len += sizeof(SCompBlock);
|
||||||
ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
|
ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
|
||||||
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
|
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
|
||||||
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
|
pIdx->hasLast = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
|
||||||
|
|
||||||
if (pIdx->numOfBlocks > 1) {
|
if (pIdx->numOfBlocks > 1) {
|
||||||
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
|
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
||||||
blkIdx);
|
blkIdx);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1048,8 +936,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
if (pSCompBlock->numOfSubBlocks > 1) {
|
if (pSCompBlock->numOfSubBlocks > 1) {
|
||||||
size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
|
size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
|
||||||
if (tsize > 0) {
|
if (tsize > 0) {
|
||||||
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
|
memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset),
|
||||||
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
|
POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
||||||
|
@ -1062,8 +950,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
|
|
||||||
*pSCompBlock = *pCompBlock;
|
*pSCompBlock = *pCompBlock;
|
||||||
|
|
||||||
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
|
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
|
||||||
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
|
pIdx->hasLast = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
|
||||||
blkIdx);
|
blkIdx);
|
||||||
|
@ -1071,30 +959,6 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the number of rows in range [minKey, maxKey]
|
|
||||||
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
|
|
||||||
if (pDataCols->numOfRows == 0) return 0;
|
|
||||||
|
|
||||||
ASSERT(minKey <= maxKey);
|
|
||||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
|
||||||
TSKEY keyLast = dataColsKeyLast(pDataCols);
|
|
||||||
ASSERT(keyFirst <= keyLast);
|
|
||||||
|
|
||||||
if (minKey > keyLast || maxKey < keyFirst) return 0;
|
|
||||||
|
|
||||||
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
|
||||||
compTSKEY, TD_GE);
|
|
||||||
ASSERT(ptr1 != NULL);
|
|
||||||
|
|
||||||
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
|
|
||||||
compTSKEY, TD_LE);
|
|
||||||
ASSERT(ptr2 != NULL);
|
|
||||||
|
|
||||||
if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;
|
|
||||||
|
|
||||||
return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
||||||
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
||||||
pHelper->files.fid = -1;
|
pHelper->files.fid = -1;
|
||||||
|
@ -1250,7 +1114,8 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lseek(pFile->fd, pCompCol->offset, SEEK_SET) < 0) {
|
int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset;
|
||||||
|
if (lseek(pFile->fd, offset, SEEK_SET) < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1276,10 +1141,15 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
|
||||||
|
|
||||||
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
|
ASSERT(colIds[0] == 0);
|
||||||
|
|
||||||
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
||||||
|
SCompCol compCol = {0};
|
||||||
|
|
||||||
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
// If only load timestamp column, no need to load SCompData part
|
||||||
|
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
|
||||||
|
|
||||||
|
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||||
|
|
||||||
int dcol = 0;
|
int dcol = 0;
|
||||||
int ccol = 0;
|
int ccol = 0;
|
||||||
|
@ -1298,6 +1168,13 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
|
|
||||||
ASSERT(pDataCol->colId == colId);
|
ASSERT(pDataCol->colId == colId);
|
||||||
|
|
||||||
|
if (colId == 0) { // load the key row
|
||||||
|
compCol.colId = colId;
|
||||||
|
compCol.len = pCompBlock->keyLen;
|
||||||
|
compCol.type = pDataCol->type;
|
||||||
|
compCol.offset = TSDB_KEY_COL_OFFSET;
|
||||||
|
pCompCol = &compCol;
|
||||||
|
} else { // load non-key rows
|
||||||
while (ccol < pCompBlock->numOfCols) {
|
while (ccol < pCompBlock->numOfCols) {
|
||||||
pCompCol = &pHelper->pCompData->cols[ccol];
|
pCompCol = &pHelper->pCompData->cols[ccol];
|
||||||
if (pCompCol->colId >= colId) break;
|
if (pCompCol->colId >= colId) break;
|
||||||
|
@ -1311,10 +1188,11 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pCompCol->colId == pDataCol->colId);
|
ASSERT(pCompCol->colId == pDataCol->colId);
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
|
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
|
||||||
dcol++;
|
dcol++;
|
||||||
ccol++;
|
if (colId != 0) ccol++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1362,8 +1240,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
pDataCols->numOfRows = pCompBlock->numOfRows;
|
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||||
|
|
||||||
// Recover the data
|
// Recover the data
|
||||||
int ccol = 0;
|
int ccol = 0; // loop iter for SCompCol object
|
||||||
int dcol = 0;
|
int dcol = 0; // loop iter for SDataCols object
|
||||||
while (dcol < pDataCols->numOfCols) {
|
while (dcol < pDataCols->numOfCols) {
|
||||||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||||
if (ccol >= pCompData->numOfCols) {
|
if (ccol >= pCompData->numOfCols) {
|
||||||
|
@ -1373,12 +1251,23 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
int16_t tcolId = 0;
|
||||||
|
int32_t toffset = TSDB_KEY_COL_OFFSET;
|
||||||
|
int32_t tlen = pCompBlock->keyLen;
|
||||||
|
|
||||||
if (pCompCol->colId == pDataCol->colId) {
|
if (dcol != 0) {
|
||||||
|
SCompCol *pCompCol = &(pCompData->cols[ccol]);
|
||||||
|
tcolId = pCompCol->colId;
|
||||||
|
toffset = pCompCol->offset;
|
||||||
|
tlen = pCompCol->len;
|
||||||
|
} else {
|
||||||
|
ASSERT(pDataCol->colId == tcolId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tcolId == pDataCol->colId) {
|
||||||
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
if (pCompBlock->algorithm == TWO_STAGE_COMP) {
|
||||||
int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
|
||||||
if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
|
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
|
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
|
||||||
}
|
}
|
||||||
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
|
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
|
||||||
|
@ -1387,16 +1276,16 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
|
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + toffset, tlen, pCompBlock->algorithm,
|
||||||
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
|
pCompBlock->numOfRows, pDataCols->maxPoints, pHelper->compBuffer,
|
||||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
tsizeof(pHelper->compBuffer)) < 0) {
|
||||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
|
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
||||||
pCompCol->colId, (int64_t)pCompCol->offset);
|
REPO_ID(pHelper->pRepo), pFile->fname, tcolId, (int64_t)pCompBlock->offset, toffset);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if (dcol != 0) ccol++;
|
||||||
dcol++;
|
dcol++;
|
||||||
ccol++;
|
} else if (tcolId < pDataCol->colId) {
|
||||||
} else if (pCompCol->colId < pDataCol->colId) {
|
|
||||||
ccol++;
|
ccol++;
|
||||||
} else {
|
} else {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
|
@ -1442,3 +1331,250 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
|
||||||
|
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||||
|
STable * pTable = pCommitIter->pTable;
|
||||||
|
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
|
||||||
|
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
|
||||||
|
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||||
|
SCompBlock compBlock = {0};
|
||||||
|
|
||||||
|
ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey);
|
||||||
|
if (pIdx->hasLast) { // append to with last block
|
||||||
|
ASSERT(pIdx->len > 0);
|
||||||
|
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
|
||||||
|
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
|
||||||
|
pDataCols, NULL, 0);
|
||||||
|
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
|
||||||
|
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
|
||||||
|
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
|
||||||
|
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
|
||||||
|
} else {
|
||||||
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
|
||||||
|
|
||||||
|
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
|
||||||
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows);
|
||||||
|
|
||||||
|
if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
|
||||||
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||||
|
} else {
|
||||||
|
ASSERT(!pHelper->hasOldLastBlock);
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
|
||||||
|
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
|
||||||
|
|
||||||
|
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
|
||||||
|
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
|
||||||
|
int *blkIdx) {
|
||||||
|
STsdbCfg * pCfg = &(pHelper->pRepo->config);
|
||||||
|
STable * pTable = pCommitIter->pTable;
|
||||||
|
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
|
||||||
|
SCompBlock compBlock = {0};
|
||||||
|
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
|
||||||
|
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||||
|
SDataCols *pDataCols0 = pHelper->pDataCols[0];
|
||||||
|
|
||||||
|
SSkipListIterator slIter = {0};
|
||||||
|
|
||||||
|
ASSERT(keyFirst <= pIdx->maxKey);
|
||||||
|
|
||||||
|
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
|
||||||
|
pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE);
|
||||||
|
ASSERT(pCompBlock != NULL);
|
||||||
|
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
|
||||||
|
|
||||||
|
if (pCompBlock->last) {
|
||||||
|
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1);
|
||||||
|
int16_t colId = 0;
|
||||||
|
slIter = *(pCommitIter->pIter);
|
||||||
|
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
|
||||||
|
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
|
||||||
|
|
||||||
|
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
|
||||||
|
int rows2 =
|
||||||
|
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
||||||
|
if (rows2 == 0) { // all data filtered out
|
||||||
|
*(pCommitIter->pIter) = slIter;
|
||||||
|
} else {
|
||||||
|
if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS &&
|
||||||
|
!TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
|
||||||
|
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
||||||
|
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
|
||||||
|
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
||||||
|
tblkIdx++;
|
||||||
|
} else {
|
||||||
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
|
int round = 0;
|
||||||
|
int dIter = 0;
|
||||||
|
while (true) {
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead =
|
||||||
|
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
|
||||||
|
if (rowsRead == 0) break;
|
||||||
|
|
||||||
|
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
|
||||||
|
if (round == 0) {
|
||||||
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
} else {
|
||||||
|
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tblkIdx++;
|
||||||
|
round++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
|
||||||
|
TSKEY blkKeyFirst = pCompBlock->keyFirst;
|
||||||
|
TSKEY blkKeyLast = pCompBlock->keyLast;
|
||||||
|
|
||||||
|
if (keyFirst < blkKeyFirst) {
|
||||||
|
while (true) {
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead =
|
||||||
|
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
|
||||||
|
if (rowsRead == 0) break;
|
||||||
|
|
||||||
|
ASSERT(rowsRead == pDataCols->numOfRows);
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1;
|
||||||
|
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
tblkIdx++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(keyFirst <= blkKeyLast);
|
||||||
|
int16_t colId = 0;
|
||||||
|
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
|
||||||
|
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
|
||||||
|
|
||||||
|
slIter = *(pCommitIter->pIter);
|
||||||
|
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
|
||||||
|
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
|
||||||
|
pDataCols0->numOfRows);
|
||||||
|
|
||||||
|
if (rows2 == 0) { // all filtered out
|
||||||
|
*(pCommitIter->pIter) = slIter;
|
||||||
|
} else {
|
||||||
|
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
|
||||||
|
ASSERT(rows3 >= rows2);
|
||||||
|
|
||||||
|
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
|
||||||
|
int rows = (rows1 >= rows3) ? rows3 : rows2;
|
||||||
|
tdResetDataCols(pDataCols);
|
||||||
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
|
||||||
|
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
||||||
|
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0)
|
||||||
|
return -1;
|
||||||
|
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
||||||
|
tblkIdx++;
|
||||||
|
} else {
|
||||||
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
|
int round = 0;
|
||||||
|
int dIter = 0;
|
||||||
|
while (true) {
|
||||||
|
int rowsRead =
|
||||||
|
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
|
||||||
|
if (rowsRead == 0) break;
|
||||||
|
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
|
||||||
|
return -1;
|
||||||
|
if (round == 0) {
|
||||||
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
} else {
|
||||||
|
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
round++;
|
||||||
|
tblkIdx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*blkIdx = tblkIdx;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||||
|
TSKEY maxKey, int maxRows) {
|
||||||
|
int numOfRows = 0;
|
||||||
|
TSKEY key1 = INT64_MAX;
|
||||||
|
TSKEY key2 = INT64_MAX;
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
|
||||||
|
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
|
||||||
|
tdResetDataCols(pTarget);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
|
||||||
|
SDataRow row = tsdbNextIterRow(pCommitIter->pIter);
|
||||||
|
key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row);
|
||||||
|
|
||||||
|
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
|
||||||
|
|
||||||
|
if (key1 <= key2) {
|
||||||
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
|
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||||
|
pTarget->maxPoints);
|
||||||
|
}
|
||||||
|
pTarget->numOfRows++;
|
||||||
|
(*iter)++;
|
||||||
|
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
|
||||||
|
} else {
|
||||||
|
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||||
|
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
|
||||||
|
ASSERT(pSchema != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
tdAppendDataRowToDataCol(row, pSchema, pTarget);
|
||||||
|
tSkipListIterNext(pCommitIter->pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfRows++;
|
||||||
|
if (numOfRows >= maxRows) break;
|
||||||
|
ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints);
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) {
|
||||||
|
STsdbCfg *pCfg = &(pHelper->pRepo->config);
|
||||||
|
SFile * pFile = NULL;
|
||||||
|
bool isLast = false;
|
||||||
|
|
||||||
|
ASSERT(pDataCols->numOfRows > 0);
|
||||||
|
|
||||||
|
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
|
||||||
|
pFile = &(pHelper->files.dataF);
|
||||||
|
} else {
|
||||||
|
isLast = true;
|
||||||
|
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pFile->fd > 0);
|
||||||
|
|
||||||
|
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue