From 7032405e95f63a1b0575a312b6a606506102ffc4 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 27 Mar 2020 20:19:34 +0800 Subject: [PATCH 1/9] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 16 ++- src/vnode/tsdb/src/tsdbFile.c | 44 ++++++--- src/vnode/tsdb/src/tsdbMain.c | 180 ++++++++++++++++++++-------------- 3 files changed, 152 insertions(+), 88 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 22563275cd..c761f8520e 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -25,6 +25,8 @@ extern "C" { #endif +#define TSDB_FILE_HEAD_SIZE 512 + #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) @@ -69,6 +71,7 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); @@ -104,6 +107,9 @@ typedef struct { TSKEY keyLast; } SCompBlock; +#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1) +#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) + typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API @@ -111,8 +117,7 @@ typedef struct { SCompBlock blocks[]; } SCompInfo; -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) // TODO: take pre-calculation into account typedef struct { @@ -129,6 +134,13 @@ typedef struct { int64_t uid; // For recovery usage SCompCol cols[]; } SCompData; +int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast); + +int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); +int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); +// TODO: need an API to merge all sub-block data into one int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 7d0bdbd845..f22274531d 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -24,7 +24,6 @@ #include "tsdbFile.h" -#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F const char *tsdbFileSuffix[] = { @@ -35,8 +34,7 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); @@ -71,10 +69,10 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (tsdbSearchFGroup(pFileH, fid) == NULL) { + if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) { // TODO: deal with the ERROR here, remove those creaed file return -1; } @@ -105,6 +103,10 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) { + // TODO + return 0; +} int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); @@ -127,6 +129,22 @@ int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { return 0; } +int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { + // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); + + if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; + if (read(pFile->fd, buf, size) < 0) return -1; + + return 0; +} + +int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { + if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; + if (read(pFile->fd, buf, pCol->len) < 0) return -1; + return 0; +} + static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write SDataCols * pCols, // Data column buffer int numOfPointsToWrie, // Number of points to write to the file @@ -229,10 +247,10 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return 0; } -static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) { - if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1; +static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname) { + if (dataDir == NULL || fname == NULL) return -1; - sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); + sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); return 0; } @@ -264,12 +282,12 @@ static int tsdbCloseFile(SFile *pFile) { return ret; } -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { +int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { memset((void *)pFile, 0, sizeof(SFile)); - pFile->type = type; pFile->fd = -1; - tsdbGetFileName(dataDir, fileId, type, pFile->fname); + tsdbGetFileName(dataDir, fileId, suffix, pFile->fname); + if (access(pFile->fname, F_OK) == 0) { // File already exists return -1; @@ -280,7 +298,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - if (type == TSDB_FILE_TYPE_HEAD) { + if (writeHeader) { if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { tsdbCloseFile(pFile); return -1; @@ -292,7 +310,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - tsdbCloseFile(pFile); + if (toClose) tsdbCloseFile(pFile); return 0; } diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 8e433ecb5b..3945ffabda 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -8,6 +8,7 @@ #include #include #include +#include #include // #include "taosdef.h" @@ -45,6 +46,7 @@ #define TSDB_CFG_FILE_NAME "CONFIG" #define TSDB_DATA_DIR_NAME "data" #define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7 +#define TSDB_MAX_LAST_FILE_SIZE (1024 * 1024 * 10) // 10M enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; @@ -775,7 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max tdAppendDataRowToDataCol(row, pCols); numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; } while (tSkipListIterNext(pIter)); return numOfRows; @@ -842,7 +844,10 @@ static void *tsdbCommitData(void *arg) { int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - tsdbCommitToFile(pRepo, fid, iters, pCols); + if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) { + // TODO: deal with the error here + // assert(0); + } } tdFreeDataCols(pCols); @@ -867,7 +872,8 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int flag = 0; + int hasDataToCommit = 0; + int isNewLastFile = 0; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; @@ -887,97 +893,125 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; int isLoadCompBlocks = 0; + char dataDir[128] = "\0"; if (pIter == NULL) continue; tdInitDataCols(pCols, pTable->schema); int numOfWrites = 0; - // while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) { - // break; - // if (!flag) { - // // There are data to commit to this file, we need to create/open it for read/write. - // // At the meantime, we set the flag to prevent further create/open operations - // if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - // // Open files for commit - // pGroup = tsdbOpenFilesForCommit(pFileH, fid); - // if (pGroup == NULL) { - // // TODO: deal with the ERROR here - // } - // // TODO: open .h file and if neccessary, open .l file - // {} - // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - // if (pIndices == NULL) { - // // TODO: deal with the ERROR - // } - // // load the SCompIdx part - // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose + // Loop to read columns from cache + while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { + if (!hasDataToCommit) { + // There are data to commit to this fileId, we need to create/open it for read/write. + // At the meantime, we set the flag to prevent further create/open operations + tsdbGetDataDirName(pRepo, dataDir); - // // TODO: sendfile those not need changed table content - // for (int ttid = 0; ttid < tid; ttid++) { - // // SCompIdx *pIdx = &pIndices[ttid]; - // // if (pIdx->len > 0) { - // // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR); - // // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); - // // } - // } - // flag = 1; - // } + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { + // TODO: deal with the ERROR here + } + // Open files for commit + pGroup = tsdbOpenFilesForCommit(pFileH, fid); + if (pGroup == NULL) { + // TODO: deal with the ERROR here + } + // TODO: open .h file and if neccessary, open .l file + tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); + if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // TODO: make it not to write the last file every time + tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + isNewLastFile = 1; + } - // SCompIdx *pIdx = &pIndices[tid]; + // load the SCompIdx part + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + if (pIndices == NULL) { // TODO: deal with the ERROR + } + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here + } - // /* The first time to write to the table, need to decide - // * if it is neccessary to load the SComplock part. If it - // * is needed, just load it, or, just use sendfile and - // * append it. - // */ - // if (numOfWrites == 0 && pIdx->offset > 0) { - // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // // TODO: deal with the ERROR here - // } - // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - // } else { - // // TODO: sendfile the prefix part - // } - // } + // sendfile those not need to changed table content + lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, + SEEK_SET); + lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + for (int ttid = 0; ttid < tid; ttid++) { + SCompIdx * tIdx= &pIndices[ttid]; + if (tIdx->len <= 0) continue; + if (isNewLastFile && tIdx->hasLast) { + // TODO: Need to load the SCompBlock part and copy to new last file + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); + if (pCompInfo == NULL) { /* TODO */} + if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} + SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); + int numOfSubBlocks = pLastBlock->numOfSubBlocks; + assert(pLastBlock->last); + if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} + { + if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); + tIdx->checksum = 0; + } + write(tFile.fd, (void *)pCompInfo, tIdx->len); + tFile.size += tIdx->len; + } else { + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); + tFile.size += tIdx->len; + } + } - // // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { - // // // TODO: deal with the ERROR here - // // } + hasDataToCommit = 1; + } - // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + SCompIdx *pIdx = &pIndices[tid]; + + /* The first time to write to the table, need to decide + * if it is neccessary to load the SComplock part. If it + * is needed, just load it, or, just use sendfile and + * append it. + */ + if (numOfWrites == 0 && pIdx->offset > 0) { + if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // TODO: deal with the ERROR here + } + if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + } else { + // TODO: sendfile the prefix part + } + } + + // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { + // // TODO: deal with the ERROR here + // } + + // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - // // if (1 /* the SCompBlock part is not loaded*/) { - // // // Append to .data file generate a SCompBlock and record it - // // } else { - // // } + // if (1 /* the SCompBlock part is not loaded*/) { + // // Append to .data file generate a SCompBlock and record it + // } else { + // } - // // // TODO: need to reset the pCols + // // TODO: need to reset the pCols - // numOfWrites++; - // } + numOfWrites++; + } - // if (pCols->numOfPoints > 0) { - // // TODO: still has data to commit, commit it - // } + if (pCols->numOfPoints > 0) { + // TODO: still has data to commit, commit it + } - // if (1/* SCompBlock part is loaded, write it to .head file*/) { - // // TODO - // } else { - // // TODO: use sendfile send the old part and append the newly added part - // } + if (1/* SCompBlock part is loaded, write it to .head file*/) { + // TODO + } else { + // TODO: use sendfile send the old part and append the newly added part + } } // Write the SCompIdx part // Close all files and return - if (flag) { + if (hasDataToCommit) { // TODO } From f29b0fcaf7cfb1b0b509108a85a36911fedc2337 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 12:08:41 +0800 Subject: [PATCH 2/9] TD-34 --- src/common/inc/dataformat.h | 1 + src/common/src/dataformat.c | 4 + src/vnode/tsdb/inc/tsdbFile.h | 5 +- src/vnode/tsdb/src/tsdbFile.c | 55 +++++- src/vnode/tsdb/src/tsdbMain.c | 313 +++++++++++++++++++++------------- 5 files changed, 257 insertions(+), 121 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 52b2d1e156..a7016061e4 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -132,6 +132,7 @@ void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 8f6a40805f..56565b6e5c 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -353,6 +353,10 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { } pCols->numOfPoints++; } +// Pop pointsToPop points from the SDataCols +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { + +} /** * Return the first part length of a data row for a schema diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index c761f8520e..e4592ffa2e 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -134,12 +134,15 @@ typedef struct { int64_t uid; // For recovery usage SCompCol cols[]; } SCompData; -int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast); + +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); + // TODO: need an API to merge all sub-block data into one int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index f22274531d..c1b7ae2382 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -103,8 +103,59 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } -int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) { - // TODO + +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { + SCompBlock *pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; + for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { + SCompCol *pCompCol = &(pCompData->cols[iCol]); + pCols->numOfPoints += pBlock->numOfPoints; + int k = 0; + for (; k < pCols->numOfCols; k++) { + if (pCompCol->colId == pCols->cols[k].colId) break; + } + + if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, + (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) + return -1; + } + pStartBlock++; + } + return 0; +} + +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { + SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); + SCompBlock *pStartBlock = NULL; + SCompBlock *pBlock = NULL; + int numOfBlocks = pSuperBlock->numOfSubBlocks; + + if (numOfBlocks == 1) + pStartBlock = pSuperBlock; + else + pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset); + + int maxNumOfCols = 0; + pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols; + pBlock++; + } + + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols); + if (pCompData == NULL) return -1; + + // Load data from the block + if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData)); + + // Write data block to the file + { + // TODO + } + + + if (pCompData) free(pCompData); return 0; } diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 3945ffabda..599af73813 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -88,6 +88,8 @@ static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -872,13 +874,12 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int hasDataToCommit = 0; int isNewLastFile = 0; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; - SFile tFile, lFile; + SFile hFile, lFile; SFileGroup *pGroup = NULL; SCompIdx * pIndices = NULL; SCompInfo * pCompInfo = NULL; @@ -889,125 +890,181 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable * pTable = pMeta->tables[tid]; - SSkipListIterator *pIter = iters[tid]; - int isLoadCompBlocks = 0; - char dataDir[128] = "\0"; + // Check if there are data to commit to this file + int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); + if (!hasDataToCommit) return 0; // No data to commit, just return - if (pIter == NULL) continue; - tdInitDataCols(pCols, pTable->schema); - - int numOfWrites = 0; - int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose - // Loop to read columns from cache - while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { - if (!hasDataToCommit) { - // There are data to commit to this fileId, we need to create/open it for read/write. - // At the meantime, we set the flag to prevent further create/open operations - tsdbGetDataDirName(pRepo, dataDir); - - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { - // TODO: deal with the ERROR here - } - // Open files for commit - pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) { - // TODO: deal with the ERROR here - } - // TODO: open .h file and if neccessary, open .l file - tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); - if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // TODO: make it not to write the last file every time - tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - isNewLastFile = 1; - } - - // load the SCompIdx part - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - if (pIndices == NULL) { // TODO: deal with the ERROR - } - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here - } - - // sendfile those not need to changed table content - lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, - SEEK_SET); - lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); - for (int ttid = 0; ttid < tid; ttid++) { - SCompIdx * tIdx= &pIndices[ttid]; - if (tIdx->len <= 0) continue; - if (isNewLastFile && tIdx->hasLast) { - // TODO: Need to load the SCompBlock part and copy to new last file - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); - if (pCompInfo == NULL) { /* TODO */} - if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} - SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); - int numOfSubBlocks = pLastBlock->numOfSubBlocks; - assert(pLastBlock->last); - if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} - { - if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); - tIdx->checksum = 0; - } - write(tFile.fd, (void *)pCompInfo, tIdx->len); - tFile.size += tIdx->len; - } else { - sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); - tFile.size += tIdx->len; - } - } - - hasDataToCommit = 1; - } - - SCompIdx *pIdx = &pIndices[tid]; - - /* The first time to write to the table, need to decide - * if it is neccessary to load the SComplock part. If it - * is needed, just load it, or, just use sendfile and - * append it. - */ - if (numOfWrites == 0 && pIdx->offset > 0) { - if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // TODO: deal with the ERROR here - } - if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - } else { - // TODO: sendfile the prefix part - } - } - - // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { - // // TODO: deal with the ERROR here - // } - - // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // if (1 /* the SCompBlock part is not loaded*/) { - // // Append to .data file generate a SCompBlock and record it - // } else { - // } - - // // TODO: need to reset the pCols - - numOfWrites++; - } - - if (pCols->numOfPoints > 0) { - // TODO: still has data to commit, commit it - } - - if (1/* SCompBlock part is loaded, write it to .head file*/) { - // TODO - } else { - // TODO: use sendfile send the old part and append the newly added part - } + // Create and open files for commit + tsdbGetDataDirName(pRepo, dataDir); + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */} + pGroup = tsdbOpenFilesForCommit(pFileH, fid); + if (pGroup == NULL) {/* TODO */} + tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); + if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // TODO: make it not to write the last file every time + tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + isNewLastFile = 1; } + // Load the SCompIdx + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables); + if (pIndices == NULL) {/* TODO*/} + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */} + +// Loop to commit data in each table + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + SSkipListIterator *pIter = iters[tid]; + SCompIdx *pIdx = &pIndices[tid]; + + if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { + // This table does not have data in this range, just copy its head part and last + // part (if neccessary) to new file + if (pIdx->len != 0) { // has old data + if (isNewLastFile && pIdx->hasLast) { + // Need to load SCompBlock part and copy to new file + if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */} + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */} + + // Copy the last block from old last file to new file + // tsdbCopyBlockData() + } else { + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); + hFile.size += pIdx->len; + } + } + continue; + } + + // while () { + + // } + } + + // for (int tid = 0; tid < pCfg->maxTables; tid++) { + // STable * pTable = pMeta->tables[tid]; + // SSkipListIterator *pIter = iters[tid]; + // int isLoadCompBlocks = 0; + // char dataDir[128] = "\0"; + + // if (pIter == NULL) { + // if (hasDataToCommit && isNewLastFile()) + // continue; + // } + // tdInitDataCols(pCols, pTable->schema); + + // int numOfWrites = 0; + // int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose + // // Loop to read columns from cache + // while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { + // if (!hasDataToCommit) { + // // There are data to commit to this fileId, we need to create/open it for read/write. + // // At the meantime, we set the flag to prevent further create/open operations + // tsdbGetDataDirName(pRepo, dataDir); + + // if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + // // Open files for commit + // pGroup = tsdbOpenFilesForCommit(pFileH, fid); + // if (pGroup == NULL) { + // // TODO: deal with the ERROR here + // } + // // TODO: open .h file and if neccessary, open .l file + // tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); + // if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // // TODO: make it not to write the last file every time + // tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + // isNewLastFile = 1; + // } + + // // load the SCompIdx part + // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + // if (pIndices == NULL) { // TODO: deal with the ERROR + // } + // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here + // } + + // // sendfile those not need to changed table content + // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, + // SEEK_SET); + // lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // for (int ttid = 0; ttid < tid; ttid++) { + // SCompIdx * tIdx= &pIndices[ttid]; + // if (tIdx->len <= 0) continue; + // if (isNewLastFile && tIdx->hasLast) { + // // TODO: Need to load the SCompBlock part and copy to new last file + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); + // if (pCompInfo == NULL) { /* TODO */} + // if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} + // SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); + // int numOfSubBlocks = pLastBlock->numOfSubBlocks; + // assert(pLastBlock->last); + // if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} + // { + // if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); + // tIdx->checksum = 0; + // } + // write(tFile.fd, (void *)pCompInfo, tIdx->len); + // tFile.size += tIdx->len; + // } else { + // sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); + // tFile.size += tIdx->len; + // } + // } + + // hasDataToCommit = 1; + // } + + // SCompIdx *pIdx = &pIndices[tid]; + + // /* The first time to write to the table, need to decide + // * if it is neccessary to load the SComplock part. If it + // * is needed, just load it, or, just use sendfile and + // * append it. + // */ + // if (numOfWrites == 0 && pIdx->offset > 0) { + // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // // TODO: deal with the ERROR here + // } + // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + // } else { + // // TODO: sendfile the prefix part + // } + // } + + // int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols); + // if (numOfPointsWritten < 0) { + // // TODO: deal with the ERROR here + // } + + // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + + + // // if (1 /* the SCompBlock part is not loaded*/) { + // // // Append to .data file generate a SCompBlock and record it + // // } else { + // // } + + // // // TODO: need to reset the pCols + + // numOfWrites++; + // } + + // if (pCols->numOfPoints > 0) { + // // TODO: still has data to commit, commit it + // } + + // if (1/* SCompBlock part is loaded, write it to .head file*/) { + // // TODO + // } else { + // // TODO: use sendfile send the old part and append the newly added part + // } + // } + // Write the SCompIdx part // Close all files and return @@ -1018,5 +1075,25 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo); + return 0; +} + +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) { + if (pIter == NULL) return 0; + + SSkipListNode *node = tSkipListIterGet(pIter); + if (node == NULL) return 0; + + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1; + + return 0; +} + +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + SSkipListIterator *pIter = iters[i]; + if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; + } return 0; } \ No newline at end of file From 016b5e8550a845b8171f149b8f04e169e55ac43d Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 14:19:48 +0800 Subject: [PATCH 3/9] TD-34 --- src/vnode/tsdb/src/tsdbMain.c | 198 +++++++++------------------------- 1 file changed, 53 insertions(+), 145 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 599af73813..a3367d2e51 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -892,13 +892,15 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // Check if there are data to commit to this file int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); - if (!hasDataToCommit) return 0; // No data to commit, just return + if (!hasDataToCommit) return 0; // No data to commit, just return // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */} + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ + } pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) {/* TODO */} + if (pGroup == NULL) { /* TODO */ + } tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { // TODO: make it not to write the last file every time @@ -907,28 +909,32 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters } // Load the SCompIdx - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables); - if (pIndices == NULL) {/* TODO*/} - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */} + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + if (pIndices == NULL) { /* TODO*/ + } + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ + } -// Loop to commit data in each table + // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; + STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; - SCompIdx *pIdx = &pIndices[tid]; + SCompIdx * pIdx = &pIndices[tid]; if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { // This table does not have data in this range, just copy its head part and last // part (if neccessary) to new file - if (pIdx->len != 0) { // has old data + if (pIdx->offset > 0) { // has old data if (isNewLastFile && pIdx->hasLast) { // Need to load SCompBlock part and copy to new file - if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */} - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */} + if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ + } + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } - // Copy the last block from old last file to new file + // TODO: Copy the last block from old last file to new file // tsdbCopyBlockData() - } else { + } else { pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); hFile.size += pIdx->len; @@ -937,140 +943,42 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters continue; } - // while () { + // Load SCompBlock part if neccessary + int isCompBlockLoaded = 0; + if (pIdx->offset > 0) { + if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { + // has last block || cache key overlap with commit key + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } + if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; + } else { + // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part + // and write those new blocks to it + } + } - // } + tdInitDataCols(pCols, pTable->schema); + + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; + while (1) { + tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); + if (pCols->numOfPoints == 0) break; + + // TODO + int pointsWritten = 0; /* tsdbWriteBlockToFile(); */ + tdPopDataColsPoints(pCols, pointsWritten); + } + + // Write the SCompBlock part + if (isCompBlockLoaded) { + // merge the block into old and update pIdx + } else { + // sendfile the SCompBlock part and update the pIdx + } } - // for (int tid = 0; tid < pCfg->maxTables; tid++) { - // STable * pTable = pMeta->tables[tid]; - // SSkipListIterator *pIter = iters[tid]; - // int isLoadCompBlocks = 0; - // char dataDir[128] = "\0"; - - // if (pIter == NULL) { - // if (hasDataToCommit && isNewLastFile()) - // continue; - // } - // tdInitDataCols(pCols, pTable->schema); - - // int numOfWrites = 0; - // int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose - // // Loop to read columns from cache - // while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { - // if (!hasDataToCommit) { - // // There are data to commit to this fileId, we need to create/open it for read/write. - // // At the meantime, we set the flag to prevent further create/open operations - // tsdbGetDataDirName(pRepo, dataDir); - - // if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - // // Open files for commit - // pGroup = tsdbOpenFilesForCommit(pFileH, fid); - // if (pGroup == NULL) { - // // TODO: deal with the ERROR here - // } - // // TODO: open .h file and if neccessary, open .l file - // tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); - // if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // // TODO: make it not to write the last file every time - // tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - // isNewLastFile = 1; - // } - - // // load the SCompIdx part - // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - // if (pIndices == NULL) { // TODO: deal with the ERROR - // } - // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here - // } - - // // sendfile those not need to changed table content - // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, - // SEEK_SET); - // lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); - // for (int ttid = 0; ttid < tid; ttid++) { - // SCompIdx * tIdx= &pIndices[ttid]; - // if (tIdx->len <= 0) continue; - // if (isNewLastFile && tIdx->hasLast) { - // // TODO: Need to load the SCompBlock part and copy to new last file - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); - // if (pCompInfo == NULL) { /* TODO */} - // if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} - // SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); - // int numOfSubBlocks = pLastBlock->numOfSubBlocks; - // assert(pLastBlock->last); - // if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} - // { - // if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); - // tIdx->checksum = 0; - // } - // write(tFile.fd, (void *)pCompInfo, tIdx->len); - // tFile.size += tIdx->len; - // } else { - // sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); - // tFile.size += tIdx->len; - // } - // } - - // hasDataToCommit = 1; - // } - - // SCompIdx *pIdx = &pIndices[tid]; - - // /* The first time to write to the table, need to decide - // * if it is neccessary to load the SComplock part. If it - // * is needed, just load it, or, just use sendfile and - // * append it. - // */ - // if (numOfWrites == 0 && pIdx->offset > 0) { - // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // // TODO: deal with the ERROR here - // } - // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - // } else { - // // TODO: sendfile the prefix part - // } - // } - - // int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols); - // if (numOfPointsWritten < 0) { - // // TODO: deal with the ERROR here - // } - - // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // // if (1 /* the SCompBlock part is not loaded*/) { - // // // Append to .data file generate a SCompBlock and record it - // // } else { - // // } - - // // // TODO: need to reset the pCols - - // numOfWrites++; - // } - - // if (pCols->numOfPoints > 0) { - // // TODO: still has data to commit, commit it - // } - - // if (1/* SCompBlock part is loaded, write it to .head file*/) { - // // TODO - // } else { - // // TODO: use sendfile send the old part and append the newly added part - // } - // } - - // Write the SCompIdx part - - // Close all files and return - if (hasDataToCommit) { - // TODO - } + // TODO: close the files and replace the .head and .last file + {} if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo); From d37876fe31dd03c0745a451d73a7c0d022ff52aa Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 15:03:50 +0800 Subject: [PATCH 4/9] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 11 +++++++---- src/vnode/tsdb/src/tsdbFile.c | 18 ++++++++---------- src/vnode/tsdb/src/tsdbMain.c | 21 ++++++++++++++++++--- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index e4592ffa2e..ac96d15126 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -42,13 +42,16 @@ typedef enum { extern const char *tsdbFileSuffix[]; typedef struct { - int8_t type; - int fd; - char fname[128]; int64_t size; // total size of the file int64_t tombSize; // unused file size int32_t totalBlocks; int32_t totalSubBlocks; +} SFileInfo; + +typedef struct { + int fd; + char fname[128]; + SFileInfo info; } SFile; #define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) @@ -74,7 +77,7 @@ void tsdbCloseFileH(STsdbFileH *pFileH); int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); -SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); +int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); typedef struct { diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index c1b7ae2382..3567517d79 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -268,7 +268,7 @@ static int compFGroup(const void *arg1, const void *arg2) { static int tsdbWriteFileHead(SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; - pFile->size += TSDB_FILE_HEAD_SIZE; + pFile->info.size += TSDB_FILE_HEAD_SIZE; // TODO: write version and File statistic to the head lseek(pFile->fd, 0, SEEK_SET); @@ -292,7 +292,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return -1; } - pFile->size += size; + pFile->info.size += size; free(buf); return 0; @@ -315,6 +315,12 @@ int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function return 0; } +int tsdbCloseFile(SFile *pFile) { + int ret = close(pFile->fd); + pFile->fd = -1; + return ret; +} + SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); if (pGroup == NULL) return NULL; @@ -325,14 +331,6 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { return pGroup; } -static int tsdbCloseFile(SFile *pFile) { - if (!TSDB_IS_FILE_OPENED(pFile)) return -1; - int ret = close(pFile->fd); - pFile->fd = -1; - - return ret; -} - int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { memset((void *)pFile, 0, sizeof(SFile)); pFile->fd = -1; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index a3367d2e51..2a6b60d24d 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -937,7 +937,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters } else { pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); - hFile.size += pIdx->len; + hFile.info.size += pIdx->len; } } continue; @@ -977,8 +977,23 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters } } - // TODO: close the files and replace the .head and .last file - {} + // Write the SCompIdx part + if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */} + if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */} + + // close the files + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + tsdbCloseFile(&pGroup->files[type]); + } + tsdbCloseFile(&hFile); + if (isNewLastFile) tsdbCloseFile(&lFile); + // TODO: replace the .head and .last file + rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname); + pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info; + if (isNewLastFile) { + rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname); + pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info; + } if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo); From 5470057dbcf2760c93c8239f4899fa1016745f6a Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 16:24:59 +0800 Subject: [PATCH 5/9] TD-34 --- src/vnode/tsdb/src/tsdbMain.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 2a6b60d24d..42792bc1b8 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -332,7 +332,10 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->curBlock = NULL; // TODO: here should set as detached or use join for memory leak - pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); tsdbUnLockRepo(repo); return 0; From 2b14e5524f6a97231588662fbaeb635778556bfb Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 18:07:56 +0800 Subject: [PATCH 6/9] TD-34 --- src/common/src/dataformat.c | 11 +++++++ src/vnode/tsdb/inc/tsdbFile.h | 2 -- src/vnode/tsdb/src/tsdbFile.c | 59 ----------------------------------- src/vnode/tsdb/src/tsdbMain.c | 34 ++++++++++++++++++-- 4 files changed, 43 insertions(+), 63 deletions(-) diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 56565b6e5c..0496fc6feb 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -355,7 +355,18 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { } // Pop pointsToPop points from the SDataCols void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { + int pointsLeft = pCols->numOfPoints - pointsToPop; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *p_col = pCols->cols + iCol; + if (p_col->len > 0) { + p_col->len = TYPE_BYTES[p_col->type] * pointsLeft; + if (pointsLeft > 0) { + memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len); + } + } + } + pCols->numOfPoints = pointsLeft; } /** diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index ac96d15126..0f654ad0bd 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -148,8 +148,6 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD // TODO: need an API to merge all sub-block data into one -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 3567517d79..088a164933 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -196,65 +196,6 @@ int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void return 0; } -static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write - SDataCols * pCols, // Data column buffer - int numOfPointsToWrie, // Number of points to write to the file - SCompBlock *pBlock // SCompBlock to hold block information to return - ) { - // pBlock->last = 0; - // pBlock->offset = lseek(pFile->fd, 0, SEEK_END); - // // pBlock->algorithm = ; - // pBlock->numOfPoints = pCols->numOfPoints; - // // pBlock->sversion = ; - // // pBlock->len = ; - // pBlock->numOfSubBlocks = 1; - // pBlock->keyFirst = dataColsKeyFirst(pCols); - // pBlock->keyLast = dataColsKeyLast(pCols); - // for (int i = 0; i < pCols->numOfCols; i++) { - // // TODO: if all col value is NULL, do not save it - // pBlock->numOfCols++; - // pCompData->numOfCols++; - // SCompCol *pCompCol = pCompData->cols + i; - // pCompCol->colId = pCols->cols[i].colId; - // pCompCol->type = pCols->cols[i].type; - - // // pCompCol->len = ; - // // pCompCol->offset = ; - // } - - return 0; -} - -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) { - memset((void *)pBlock, 0, sizeof(SCompBlock)); - SFile *pFile = NULL; - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols); - if (pCompData == NULL) return -1; - pCompData->delimiter = TSDB_FILE_DELIMITER; - // pCompData->uid = ; - - if (isMerge) { - TSKEY keyFirst = dataColsKeyFirst(pCols); - // 1. Binary search the block the data can merged into - - if (1/* the data should only merged into last file */) { - } else { - } - } else { - // Write directly to the file without merge - if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) { - // TODO: write the data to the last file - } else { - // TODO: wirte the data to the data file - } - } - - // TODO: need to update pIdx - - if (pCompData) free(pCompData); - return 0; -} - static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 42792bc1b8..5f7dd2513f 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -967,9 +967,26 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); if (pCols->numOfPoints == 0) break; - // TODO - int pointsWritten = 0; /* tsdbWriteBlockToFile(); */ + int pointsWritten = 0; + // { // TODO : try to write the block data to file + // if (!isCompBlockLoaded) { // Just append + // if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file + // lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); + + // } else { + // if (isNewLastFile) { // write directly to .l file + + // } else { // write directly to .last file + + // } + // } + // } else { // Need to append + // // SCompBlock *pTBlock = NULL; + // } + // } + pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; } // Write the SCompBlock part @@ -1022,4 +1039,17 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; } return 0; +} + +static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { + STsdbCfg *pCfg = &(pRepo->config); + + memset((void *)pCompBlock, 0, sizeof(SCompBlock)); + + if (pCompInfo == NULL) { + // Just need to append to file + } else { + // Need to merge + } + return 0; } \ No newline at end of file From 12d2e463eb2e55980bd30712de8d18395c9418e3 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 18:58:51 +0800 Subject: [PATCH 7/9] TD-34 --- src/common/inc/dataformat.h | 6 ++-- src/vnode/tsdb/inc/tsdbFile.h | 1 + src/vnode/tsdb/src/tsdbFile.c | 2 -- src/vnode/tsdb/src/tsdbMain.c | 59 ++++++++++++++++++++++++++++++++++- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index a7016061e4..4e8afd4f0e 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -119,13 +119,15 @@ typedef struct { int maxPoints; // max number of points int numOfPoints; int numOfCols; // Total number of cols + int sversion; // TODO: set sversion void * buf; SDataCol cols[]; } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0] -#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1] +#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)] +#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) +#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 0f654ad0bd..7e358cc0a2 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -26,6 +26,7 @@ extern "C" { #endif #define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 088a164933..9a0db96bd0 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -24,8 +24,6 @@ #include "tsdbFile.h" -#define TSDB_FILE_DELIMITER 0xF00AFA0F - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 5f7dd2513f..d18e766c38 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -1041,13 +1041,70 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK return 0; } +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; + SCompData *pCompData = (SCompData *)malloc(size); + if (pCompData == NULL) return -1; + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = uid; + pCompData->numOfCols = pCols->numOfCols; + + *offset = lseek(pFile->fd, 0, SEEK_END); + *len = size; + + int toffset = size; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SCompCol *pCompCol = pCompData->cols + iCol; + SDataCol *pDataCol = pCols->cols + iCol; + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + pCompCol->offset = toffset; + + // TODO: add compression + pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; + toffset += pCompCol->len; + } + + // Write the block + if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *pDataCol = pCols->cols + iCol; + SCompCol *pCompCol = pCompData->cols + iCol; + if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; + } + + if (pCompData == NULL) free((void *)pCompData); + return 0; + +_err: + if (pCompData == NULL) free((void *)pCompData); + return -1; +} + static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { STsdbCfg *pCfg = &(pRepo->config); + SCompData *pCompData = NULL; memset((void *)pCompBlock, 0, sizeof(SCompBlock)); if (pCompInfo == NULL) { - // Just need to append to file + if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file + // tsdbWriteBlockToFileImpl() + } else { // Write to .last or .l file + pCompBlock->last = 1; + + } + // pCompBlock->offset = ; + // pCompBlock->len = ; + pCompBlock->algorithm = 2; // TODO : add to configuration + pCompBlock->sversion = pCols->sversion; + pCompBlock->numOfPoints = pCols->numOfPoints; + pCompBlock->numOfSubBlocks = 1; + pCompBlock->numOfCols = pCols->numOfCols; + pCompBlock->keyFirst = dataColsKeyFirst(pCols); + pCompBlock->keyLast = dataColsKeyLast(pCols); } else { // Need to merge } From f0bd200eb62de8568a514892fca492469369c046 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 28 Mar 2020 22:52:01 +0800 Subject: [PATCH 8/9] TD-34 --- src/vnode/tsdb/src/tsdbMain.c | 79 ++++++++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index d18e766c38..984f2ca203 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -1069,10 +1069,12 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo // Write the block if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; + *len += size; for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { SDataCol *pDataCol = pCols->cols + iCol; SCompCol *pCompCol = pCompData->cols + iCol; if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; + *len += pCompCol->len; } if (pCompData == NULL) free((void *)pCompData); @@ -1083,22 +1085,46 @@ _err: return -1; } -static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { - STsdbCfg *pCfg = &(pRepo->config); +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SCompBlock *pBlock = (SCompBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { + STsdbCfg * pCfg = &(pRepo->config); SCompData *pCompData = NULL; + SFile * pFile = NULL; + int numOfPointsToWrite = 0; + int64_t offset = 0; + int32_t len = 0; memset((void *)pCompBlock, 0, sizeof(SCompBlock)); if (pCompInfo == NULL) { - if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file - // tsdbWriteBlockToFileImpl() - } else { // Write to .last or .l file + // Just append the data block to .data or .l or .last file + numOfPointsToWrite = pCols->numOfPoints; + if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file + pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); + } else { // Write to .last or .l file pCompBlock->last = 1; - + if (lFile) { + pFile = lFile; + } else { + pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); + } } - // pCompBlock->offset = ; - // pCompBlock->len = ; - pCompBlock->algorithm = 2; // TODO : add to configuration + tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); + pCompBlock->offset = offset; + pCompBlock->len = len; + pCompBlock->algorithm = 2; // TODO : add to configuration pCompBlock->sversion = pCols->sversion; pCompBlock->numOfPoints = pCols->numOfPoints; pCompBlock->numOfSubBlocks = 1; @@ -1106,7 +1132,38 @@ static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCo pCompBlock->keyFirst = dataColsKeyFirst(pCols); pCompBlock->keyLast = dataColsKeyLast(pCols); } else { - // Need to merge + // Need to merge the block to either the last block or the other block + TSKEY keyFirst = dataColsKeyFirst(pCols); + SCompBlock *pMergeBlock = NULL; + + // Search the block to merge in + void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, + compareKeyBlock, TD_GE); + if (ptr == NULL) { + // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block + // and merge the data + pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); + } else { + pMergeBlock = (SCompBlock *)ptr; + } + + if (pMergeBlock->last) { + if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { + // Need to load the data from .last and combine data in pCols to write to .data file + + } else { // Just append the block to .last or .l file + if (lFile) { + // read the block from .last file and merge with pCols, write to .l file + + } else { + // tsdbWriteBlockToFileImpl(); + } + } + } else { // The block need to merge in .data file + + } + } - return 0; + + return numOfPointsToWrite; } \ No newline at end of file From ffb02d974dbbb3c3fc95ab101aa7b99b19c6aab3 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sun, 29 Mar 2020 15:01:00 +0800 Subject: [PATCH 9/9] TD-34 --- src/util/src/tlist.c | 9 +++--- src/vnode/tsdb/inc/tsdbFile.h | 11 +++++++ src/vnode/tsdb/src/tsdbFile.c | 3 +- src/vnode/tsdb/src/tsdbMain.c | 59 ++++++++++++++++++++++++++--------- 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index badcb7802f..aaedc76726 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { // Move all node elements from src to dst, the dst is assumed as an empty list void tdListMove(SList *src, SList *dst) { // assert(dst->eleSize == src->eleSize); - dst->numOfEles = src->numOfEles; - dst->head = src->head; - dst->tail = src->tail; - src->numOfEles = 0; - src->head = src->tail = NULL; + SListNode *node = NULL; + while ((node = tdListPopHead(src)) != NULL) { + tdListAppendNode(dst, node); + } } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 7e358cc0a2..8c106d1067 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -122,6 +122,15 @@ typedef struct { } SCompInfo; #define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) +#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\ +do {\ + if (pCompBlock->numOfSubBlocks > 1) {\ + pCompBlock = pCompInfo->blocks + pCompBlock->offset;\ + size = pCompBlock->numOfSubBlocks;\ + } else {\ + size = 1;\ + }\ +} while (0) // TODO: take pre-calculation into account typedef struct { @@ -147,6 +156,8 @@ int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); + // TODO: need an API to merge all sub-block data into one void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 9a0db96bd0..5240a99a37 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -35,7 +35,6 @@ static int compFGroup(const void *arg1, const void *arg2); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -309,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) return NULL; void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 984f2ca203..769fc23815 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -90,6 +90,8 @@ static void * tsdbCommitData(void *arg); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, + int64_t uid); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -330,13 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; pRepo->tsdbCache->curBlock = NULL; + tsdbUnLockRepo(repo); // TODO: here should set as detached or use join for memory leak pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); - tsdbUnLockRepo(repo); return 0; } @@ -814,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) } if (!tSkipListIterNext(iters[tid])) { - assert(false); + // No data in this iterator + tSkipListDestroyIter(iters[tid]); + iters[tid] = NULL; } } @@ -839,8 +843,8 @@ static void *tsdbCommitData(void *arg) { } // Create a data column buffer for commit - SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); - if (pCols == NULL) { + SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); + if (pDataCols == NULL) { // TODO: deal with the error return NULL; } @@ -849,16 +853,15 @@ static void *tsdbCommitData(void *arg) { int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) { + if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) { // TODO: deal with the error here // assert(0); } } - tdFreeDataCols(pCols); + tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); - tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); free(pCache->imem); @@ -918,25 +921,52 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ } + lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; SCompIdx * pIdx = &pIndices[tid]; + if (pTable == NULL || pIter == NULL) continue; + + /* If no new data to write for this table, just write the old data to new file + * if there are. + */ if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { - // This table does not have data in this range, just copy its head part and last - // part (if neccessary) to new file - if (pIdx->offset > 0) { // has old data + // has old data + if (pIdx->offset > 0) { if (isNewLastFile && pIdx->hasLast) { - // Need to load SCompBlock part and copy to new file + // need to move the last block to new file if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ } if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ } - // TODO: Copy the last block from old last file to new file - // tsdbCopyBlockData() + tdInitDataCols(pCols, pTable->schema); + + SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + int nBlocks = 0; + + TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); + + SCompBlock tBlock; + int64_t toffset, tlen; + tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); + + tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, tlen, pTable->tableId.uid); + pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + pTBlock->offset = toffset; + pTBlock->len = tlen; + pTBlock->numOfPoints = pCols->numOfPoints; + pTBlock->numOfSubBlocks = 1; + + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + if (nBlocks > 1) { + pIdx->len -= (sizeof(SCompBlock) * nBlocks); + } + write(hFile.fd, (void *)pCompInfo, pIdx->len); } else { pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); @@ -951,6 +981,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pIdx->offset > 0) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { // has last block || cache key overlap with commit key + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ } if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; @@ -984,7 +1015,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // // SCompBlock *pTBlock = NULL; // } // } - pointsWritten = pCols->numOfPoints; + // pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; }