TD-100
This commit is contained in:
parent
e550e13199
commit
bc5396e90f
|
@ -401,13 +401,6 @@ typedef struct {
|
|||
size_t compDataSize;
|
||||
SDataCols *pDataCols[2];
|
||||
|
||||
// ------ Perhaps no usage
|
||||
SCompIdx compIdx; // SCompIdx of current table
|
||||
int blockIter; // For write purpose
|
||||
|
||||
// Compression buffer
|
||||
void * cBuffer;
|
||||
size_t cBufSize;
|
||||
} SRWHelper;
|
||||
|
||||
// --------- Helper state
|
||||
|
|
|
@ -801,19 +801,16 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
|
|||
if (pTable == NULL || pTable->imem == NULL) continue;
|
||||
|
||||
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
|
||||
if (iters[tid] == NULL) {
|
||||
tsdbDestroyTableIters(iters, maxTables);
|
||||
return NULL;
|
||||
}
|
||||
if (iters[tid] == NULL) goto _err;
|
||||
|
||||
if (!tSkipListIterNext(iters[tid])) {
|
||||
// No data in this iterator
|
||||
tSkipListDestroyIter(iters[tid]);
|
||||
iters[tid] = NULL;
|
||||
}
|
||||
if (!tSkipListIterNext(iters[tid])) goto _err;
|
||||
}
|
||||
|
||||
return iters;
|
||||
|
||||
_err:
|
||||
tsdbDestroyTableIters(iters, maxTables);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tsdbFreeMemTable(SMemTable *pMemTable) {
|
||||
|
@ -832,6 +829,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
STsdbCache *pCache = pRepo->tsdbCache;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SDataCols * pDataCols = NULL;
|
||||
SRWHelper whelper = {0};
|
||||
if (pCache->imem == NULL) return NULL;
|
||||
|
||||
// Create the iterator to read from cache
|
||||
|
@ -842,7 +840,6 @@ static void *tsdbCommitData(void *arg) {
|
|||
}
|
||||
|
||||
// Create a write helper for commit data
|
||||
SRWHelper whelper;
|
||||
SHelperCfg hcfg = {
|
||||
.type = TSDB_WRITE_HELPER,
|
||||
.maxTables = pCfg->maxTables,
|
||||
|
@ -990,129 +987,3 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) {
|
||||
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols;
|
||||
// SCompData *pCompData = (SCompData *)malloc(size);
|
||||
// if (pCompData == NULL) return -1;
|
||||
|
||||
// pCompData->delimiter = TSDB_FILE_DELIMITER;
|
||||
// pCompData->uid = uid;
|
||||
// pCompData->numOfCols = pCols->numOfCols;
|
||||
|
||||
// *offset = lseek(pFile->fd, 0, SEEK_END);
|
||||
// *len = size;
|
||||
|
||||
// int toffset = size;
|
||||
// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
|
||||
// SCompCol *pCompCol = pCompData->cols + iCol;
|
||||
// SDataCol *pDataCol = pCols->cols + iCol;
|
||||
|
||||
// pCompCol->colId = pDataCol->colId;
|
||||
// pCompCol->type = pDataCol->type;
|
||||
// pCompCol->offset = toffset;
|
||||
|
||||
// // TODO: add compression
|
||||
// pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite;
|
||||
// toffset += pCompCol->len;
|
||||
// }
|
||||
|
||||
// // Write the block
|
||||
// if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err;
|
||||
// *len += size;
|
||||
// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
|
||||
// SDataCol *pDataCol = pCols->cols + iCol;
|
||||
// SCompCol *pCompCol = pCompData->cols + iCol;
|
||||
// if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err;
|
||||
// *len += pCompCol->len;
|
||||
// }
|
||||
|
||||
// if (pCompData == NULL) free((void *)pCompData);
|
||||
// return 0;
|
||||
|
||||
// _err:
|
||||
// if (pCompData == NULL) free((void *)pCompData);
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// static int compareKeyBlock(const void *arg1, const void *arg2) {
|
||||
// TSKEY key = *(TSKEY *)arg1;
|
||||
// SCompBlock *pBlock = (SCompBlock *)arg2;
|
||||
|
||||
// if (key < pBlock->keyFirst) {
|
||||
// return -1;
|
||||
// } else if (key > pBlock->keyLast) {
|
||||
// return 1;
|
||||
// }
|
||||
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
// int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
|
||||
// STsdbCfg * pCfg = &(pRepo->config);
|
||||
// SFile * pFile = NULL;
|
||||
// int numOfPointsToWrite = 0;
|
||||
// int64_t offset = 0;
|
||||
// int32_t len = 0;
|
||||
|
||||
// memset((void *)pCompBlock, 0, sizeof(SCompBlock));
|
||||
|
||||
// if (pCompInfo == NULL) {
|
||||
// // Just append the data block to .data or .l or .last file
|
||||
// numOfPointsToWrite = pCols->numOfPoints;
|
||||
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file
|
||||
// pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]);
|
||||
// } else { // Write to .last or .l file
|
||||
// pCompBlock->last = 1;
|
||||
// if (lFile) {
|
||||
// pFile = lFile;
|
||||
// } else {
|
||||
// pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]);
|
||||
// }
|
||||
// }
|
||||
// tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid);
|
||||
// pCompBlock->offset = offset;
|
||||
// pCompBlock->len = len;
|
||||
// pCompBlock->algorithm = 2; // TODO : add to configuration
|
||||
// pCompBlock->sversion = pCols->sversion;
|
||||
// pCompBlock->numOfPoints = pCols->numOfPoints;
|
||||
// pCompBlock->numOfSubBlocks = 1;
|
||||
// pCompBlock->numOfCols = pCols->numOfCols;
|
||||
// pCompBlock->keyFirst = dataColsKeyFirst(pCols);
|
||||
// pCompBlock->keyLast = dataColsKeyLast(pCols);
|
||||
// } else {
|
||||
// // Need to merge the block to either the last block or the other block
|
||||
// TSKEY keyFirst = dataColsKeyFirst(pCols);
|
||||
// SCompBlock *pMergeBlock = NULL;
|
||||
|
||||
// // Search the block to merge in
|
||||
// void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks,
|
||||
// compareKeyBlock, TD_GE);
|
||||
// if (ptr == NULL) {
|
||||
// // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
|
||||
// // and merge the data
|
||||
// pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1);
|
||||
// } else {
|
||||
// pMergeBlock = (SCompBlock *)ptr;
|
||||
// }
|
||||
|
||||
// if (pMergeBlock->last) {
|
||||
// if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) {
|
||||
// // Need to load the data from .last and combine data in pCols to write to .data file
|
||||
|
||||
// } else { // Just append the block to .last or .l file
|
||||
// if (lFile) {
|
||||
// // read the block from .last file and merge with pCols, write to .l file
|
||||
|
||||
// } else {
|
||||
// // tsdbWriteBlockToFileImpl();
|
||||
// }
|
||||
// }
|
||||
// } else { // The block need to merge in .data file
|
||||
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// return numOfPointsToWrite;
|
||||
// }
|
||||
|
|
|
@ -169,7 +169,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema
|
|||
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN));
|
||||
|
||||
// Clear members and state used by previous table
|
||||
pHelper->blockIter = 0;
|
||||
// pHelper->blockIter = 0;
|
||||
pHelper->state &= (TSDB_HELPER_TABLE_SET - 1);
|
||||
|
||||
pHelper->tableInfo = *pHelperTable;
|
||||
|
@ -339,23 +339,25 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
|
|||
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
|
||||
|
||||
SCompIdx curCompIdx = pHelper->compIdx;
|
||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||
|
||||
ASSERT(curCompIdx.offset > 0 && curCompIdx.len > 0);
|
||||
// SCompIdx curCompIdx = pHelper->compIdx;
|
||||
|
||||
ASSERT(pIdx->offset > 0 && pIdx->len > 0);
|
||||
|
||||
int fd = pHelper->files.headF.fd;
|
||||
|
||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||
if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1;
|
||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
||||
|
||||
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len);
|
||||
if (tread(fd, (void *)(pHelper->pCompInfo), pHelper->compIdx.len) < pHelper->compIdx.len) return -1;
|
||||
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
|
||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
||||
// TODO: check the checksum
|
||||
|
||||
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
|
||||
}
|
||||
|
||||
if (target) memcpy(target, (void *)(pHelper->pCompInfo), curCompIdx.len);
|
||||
if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -302,7 +302,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
}
|
||||
|
||||
// load all the comp offset value for all tables in this file
|
||||
tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
|
||||
// tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
|
||||
|
||||
*numOfBlocks = 0;
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
|
@ -324,7 +324,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
pCheckInfo->compSize = compIndex->len;
|
||||
}
|
||||
|
||||
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
|
||||
// tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
|
||||
|
||||
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
|
||||
|
||||
|
@ -421,15 +421,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
|||
pFile->fd = open(pFile->fname, O_RDONLY);
|
||||
}
|
||||
|
||||
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
||||
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||
// if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
||||
// SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||
|
||||
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
|
||||
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
|
||||
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
|
||||
// pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
|
||||
// pBlockLoadInfo->slot = pQueryHandle->cur.slot;
|
||||
// pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
|
||||
|
||||
blockLoaded = true;
|
||||
}
|
||||
// blockLoaded = true;
|
||||
// }
|
||||
|
||||
taosArrayDestroy(sa);
|
||||
tfree(data);
|
||||
|
|
|
@ -140,12 +140,12 @@ TEST(TsdbTest, DISABLED_createRepo) {
|
|||
|
||||
// TEST(TsdbTest, DISABLED_openRepo) {
|
||||
TEST(TsdbTest, openRepo) {
|
||||
tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
|
||||
ASSERT_NE(repo, nullptr);
|
||||
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
|
||||
// ASSERT_NE(repo, nullptr);
|
||||
|
||||
// STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
// STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
||||
SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
|
||||
// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
|
||||
|
||||
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
// tsdbOpenFile(&pGroup->files[type], O_RDONLY);
|
||||
|
@ -156,7 +156,7 @@ TEST(TsdbTest, openRepo) {
|
|||
|
||||
// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
|
||||
|
||||
tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
|
||||
// tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
|
||||
|
||||
// int blockIdx = 0;
|
||||
// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
|
||||
|
@ -165,20 +165,20 @@ TEST(TsdbTest, openRepo) {
|
|||
|
||||
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
|
||||
|
||||
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
|
||||
SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10);
|
||||
tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
|
||||
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
|
||||
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10);
|
||||
// tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
|
||||
|
||||
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
|
||||
|
||||
tdResetDataCols(pDataCols);
|
||||
// tdResetDataCols(pDataCols);
|
||||
|
||||
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
|
||||
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
|
||||
|
||||
|
||||
// int k = 0;
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_createFileGroup) {
|
||||
SFileGroup fGroup;
|
||||
|
|
Loading…
Reference in New Issue