From bc5396e90f7b35e2c278f0404a9299941ba6ef7b Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 14 Apr 2020 18:56:12 +0800 Subject: [PATCH] TD-100 --- src/vnode/tsdb/inc/tsdbMain.h | 7 -- src/vnode/tsdb/src/tsdbMain.c | 145 ++--------------------------- src/vnode/tsdb/src/tsdbRWHelper.c | 16 ++-- src/vnode/tsdb/src/tsdbRead.c | 18 ++-- src/vnode/tsdb/tests/tsdbTests.cpp | 22 ++--- 5 files changed, 37 insertions(+), 171 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 610f377d72..6c418d2c0a 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -401,13 +401,6 @@ typedef struct { size_t compDataSize; SDataCols *pDataCols[2]; - // ------ Perhaps no usage - SCompIdx compIdx; // SCompIdx of current table - int blockIter; // For write purpose - - // Compression buffer - void * cBuffer; - size_t cBufSize; } SRWHelper; // --------- Helper state diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 52d21d5bd1..292d2eabb9 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -801,19 +801,16 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) if (pTable == NULL || pTable->imem == NULL) continue; iters[tid] = tSkipListCreateIter(pTable->imem->pData); - if (iters[tid] == NULL) { - tsdbDestroyTableIters(iters, maxTables); - return NULL; - } + if (iters[tid] == NULL) goto _err; - if (!tSkipListIterNext(iters[tid])) { - // No data in this iterator - tSkipListDestroyIter(iters[tid]); - iters[tid] = NULL; - } + if (!tSkipListIterNext(iters[tid])) goto _err; } return iters; + + _err: + tsdbDestroyTableIters(iters, maxTables); + return NULL; } static void tsdbFreeMemTable(SMemTable *pMemTable) { @@ -832,6 +829,7 @@ static void *tsdbCommitData(void *arg) { STsdbCache *pCache = pRepo->tsdbCache; STsdbCfg * pCfg = &(pRepo->config); SDataCols * pDataCols = NULL; + SRWHelper whelper = {0}; if (pCache->imem == NULL) return NULL; // Create the iterator to read from cache @@ -842,7 +840,6 @@ static void *tsdbCommitData(void *arg) { } // Create a write helper for commit data - SRWHelper whelper; SHelperCfg hcfg = { .type = TSDB_WRITE_HELPER, .maxTables = pCfg->maxTables, @@ -989,130 +986,4 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; -} - -// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { -// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; -// SCompData *pCompData = (SCompData *)malloc(size); -// if (pCompData == NULL) return -1; - -// pCompData->delimiter = TSDB_FILE_DELIMITER; -// pCompData->uid = uid; -// pCompData->numOfCols = pCols->numOfCols; - -// *offset = lseek(pFile->fd, 0, SEEK_END); -// *len = size; - -// int toffset = size; -// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { -// SCompCol *pCompCol = pCompData->cols + iCol; -// SDataCol *pDataCol = pCols->cols + iCol; - -// pCompCol->colId = pDataCol->colId; -// pCompCol->type = pDataCol->type; -// pCompCol->offset = toffset; - -// // TODO: add compression -// pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; -// toffset += pCompCol->len; -// } - -// // Write the block -// if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; -// *len += size; -// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { -// SDataCol *pDataCol = pCols->cols + iCol; -// SCompCol *pCompCol = pCompData->cols + iCol; -// if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; -// *len += pCompCol->len; -// } - -// if (pCompData == NULL) free((void *)pCompData); -// return 0; - -// _err: -// if (pCompData == NULL) free((void *)pCompData); -// return -1; -// } - -// static int compareKeyBlock(const void *arg1, const void *arg2) { -// TSKEY key = *(TSKEY *)arg1; -// SCompBlock *pBlock = (SCompBlock *)arg2; - -// if (key < pBlock->keyFirst) { -// return -1; -// } else if (key > pBlock->keyLast) { -// return 1; -// } - -// return 0; -// } - -// int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { -// STsdbCfg * pCfg = &(pRepo->config); -// SFile * pFile = NULL; -// int numOfPointsToWrite = 0; -// int64_t offset = 0; -// int32_t len = 0; - -// memset((void *)pCompBlock, 0, sizeof(SCompBlock)); - -// if (pCompInfo == NULL) { -// // Just append the data block to .data or .l or .last file -// numOfPointsToWrite = pCols->numOfPoints; -// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file -// pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); -// } else { // Write to .last or .l file -// pCompBlock->last = 1; -// if (lFile) { -// pFile = lFile; -// } else { -// pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); -// } -// } -// tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); -// pCompBlock->offset = offset; -// pCompBlock->len = len; -// pCompBlock->algorithm = 2; // TODO : add to configuration -// pCompBlock->sversion = pCols->sversion; -// pCompBlock->numOfPoints = pCols->numOfPoints; -// pCompBlock->numOfSubBlocks = 1; -// pCompBlock->numOfCols = pCols->numOfCols; -// pCompBlock->keyFirst = dataColsKeyFirst(pCols); -// pCompBlock->keyLast = dataColsKeyLast(pCols); -// } else { -// // Need to merge the block to either the last block or the other block -// TSKEY keyFirst = dataColsKeyFirst(pCols); -// SCompBlock *pMergeBlock = NULL; - -// // Search the block to merge in -// void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, -// compareKeyBlock, TD_GE); -// if (ptr == NULL) { -// // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block -// // and merge the data -// pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); -// } else { -// pMergeBlock = (SCompBlock *)ptr; -// } - -// if (pMergeBlock->last) { -// if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { -// // Need to load the data from .last and combine data in pCols to write to .data file - -// } else { // Just append the block to .last or .l file -// if (lFile) { -// // read the block from .last file and merge with pCols, write to .l file - -// } else { -// // tsdbWriteBlockToFileImpl(); -// } -// } -// } else { // The block need to merge in .data file - -// } - -// } - -// return numOfPointsToWrite; -// } +} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index d67fd03964..f41d045ae5 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -169,7 +169,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN)); // Clear members and state used by previous table - pHelper->blockIter = 0; + // pHelper->blockIter = 0; pHelper->state &= (TSDB_HELPER_TABLE_SET - 1); pHelper->tableInfo = *pHelperTable; @@ -339,23 +339,25 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - SCompIdx curCompIdx = pHelper->compIdx; + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(curCompIdx.offset > 0 && curCompIdx.len > 0); + // SCompIdx curCompIdx = pHelper->compIdx; + + ASSERT(pIdx->offset > 0 && pIdx->len > 0); int fd = pHelper->files.headF.fd; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1; + if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; - adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len); - if (tread(fd, (void *)(pHelper->pCompInfo), pHelper->compIdx.len) < pHelper->compIdx.len) return -1; + adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len); + if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; // TODO: check the checksum helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); } - if (target) memcpy(target, (void *)(pHelper->pCompInfo), curCompIdx.len); + if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len); return 0; } diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index bf45c2d0af..cd63993f7d 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -302,7 +302,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo } // load all the comp offset value for all tables in this file - tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables + // tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables *numOfBlocks = 0; size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -324,7 +324,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); + // tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); SCompInfo* pCompInfo = pCheckInfo->pCompInfo; @@ -421,15 +421,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo pFile->fd = open(pFile->fname, O_RDONLY); } - if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { - SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; + // if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { + // SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; - pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; - pBlockLoadInfo->slot = pQueryHandle->cur.slot; - pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; + // pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; + // pBlockLoadInfo->slot = pQueryHandle->cur.slot; + // pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; - blockLoaded = true; - } + // blockLoaded = true; + // } taosArrayDestroy(sa); tfree(data); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 685b2cce95..28d511ae2b 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -140,12 +140,12 @@ TEST(TsdbTest, DISABLED_createRepo) { // TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); - ASSERT_NE(repo, nullptr); + // tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); + // ASSERT_NE(repo, nullptr); -// STsdbRepo *pRepo = (STsdbRepo *)repo; + // STsdbRepo *pRepo = (STsdbRepo *)repo; - SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655); + // SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655); // for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { // tsdbOpenFile(&pGroup->files[type], O_RDONLY); @@ -156,7 +156,7 @@ TEST(TsdbTest, openRepo) { // SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); - tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo); + // tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo); // int blockIdx = 0; // SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); @@ -165,20 +165,20 @@ TEST(TsdbTest, openRepo) { // tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); - STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); - SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); - tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); + // STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); + // SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); + // tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); // tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); - tdResetDataCols(pDataCols); + // tdResetDataCols(pDataCols); - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData); + // tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData); // int k = 0; -// } +} TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup;