From 89c97225c80153b649139502b343a5fa66a1c151 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Thu, 19 Mar 2020 13:45:39 +0800 Subject: [PATCH 01/12] TD-34 --- src/common/inc/dataformat.h | 22 ++--- src/common/src/dataformat.c | 10 +++ src/vnode/tsdb/inc/tsdbFile.h | 6 +- src/vnode/tsdb/src/tsdbFile.c | 155 ++++++++++++++++++++++++---------- src/vnode/tsdb/src/tsdbMain.c | 3 - 5 files changed, 129 insertions(+), 67 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 437c38a8a4..aff239712b 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -101,23 +101,13 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); void tdDataRowReset(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); -/* Data column definition - * +---------+---------+-----------------------+ - * | int32_t | int32_t | | - * +---------+---------+-----------------------+ - * | len | npoints | data | - * +---------+---------+-----------------------+ - */ -typedef char *SDataCol; +// ----------------- Data column structure +typedef struct SDataCol { + int64_t len; + char data[]; +} SDataCol; -/* Data columns definition - * +---------+---------+-----------------------+--------+-----------------------+ - * | int32_t | int32_t | | | | - * +---------+---------+-----------------------+--------+-----------------------+ - * | len | npoints | SDataCol | .... | SDataCol | - * +---------+---------+-----------------------+--------+-----------------------+ - */ -typedef char *SDataCols; +void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 419e376392..9c356b0cbc 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -294,6 +294,16 @@ SDataRow tdDataRowDup(SDataRow row) { return trow; } +void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) { + int row = *iter; + + for (int i = 0; i < schemaNCols(pSchema); i++) { + // TODO + } + + *iter = row + 1; +} + /** * Return the first part length of a data row for a schema */ diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index ab10fd8e49..7e83b84375 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -26,8 +26,7 @@ extern "C" { typedef enum { TSDB_FILE_TYPE_HEAD, // .head file type TSDB_FILE_TYPE_DATA, // .data file type - TSDB_FILE_TYPE_LAST, // .last file type - TSDB_FILE_TYPE_META // .meta file type + TSDB_FILE_TYPE_LAST // .last file type } TSDB_FILE_TYPE; extern const char *tsdbFileSuffix[]; @@ -38,7 +37,6 @@ typedef struct { } SFileInfo; typedef struct { - int fd; int64_t size; // total size of the file int64_t tombSize; // unused file size } SFile; @@ -59,7 +57,7 @@ typedef struct { SFileGroup fGroup[]; } STsdbFileH; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META) +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_LAST) STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, int32_t maxRowsPerFBlock); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 301f297803..ad9045567b 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -12,77 +12,144 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include +#include #include #include -#include #include -#include +#include +#include +#include -#include "tsdbFile.h" #include "tglobalcfg.h" +#include "tsdbFile.h" -// int64_t tsMsPerDay[] = { -// 86400000L, // TSDB_PRECISION_MILLI -// 86400000000L, // TSDB_PRECISION_MICRO -// 86400000000000L // TSDB_PRECISION_NANO -// }; +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) typedef struct { + int32_t len; + int32_t padding; // For padding purpose int64_t offset; -} SCompHeader; - -typedef struct { - int64_t uid; - int64_t last : 1; - int64_t numOfBlocks : 63; - int32_t delimiter; -} SCompInfo; - -typedef struct { - TSKEY keyFirst; - TSKEY keyLast; - int32_t numOfBlocks; - int32_t offset; } SCompIdx; +/** + * if numOfSubBlocks == -1, then the SCompBlock is a sub-block + * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to + * the data block offset and length + * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the + * binary + */ typedef struct { + int64_t last : 1; // If the block in data file or last file + int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks + int32_t algorithm : 8; // Compression algorithm + int32_t numOfPoints : 24; // Number of total points + int32_t sversion; // Schema version + int32_t len; // Data block length or nothing + int16_t numOfSubBlocks; // Number of sub-blocks; + int16_t numOfCols; TSKEY keyFirst; TSKEY keyLast; - int64_t offset; - int32_t len; - int32_t sversion; } SCompBlock; typedef struct { - int64_t uid; -} SBlock; + int32_t delimiter; // For recovery usage + int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int64_t uid; + int32_t padding; // For padding purpose + int32_t numOfBlocks; // TODO: make the struct padding + SCompBlock blocks[]; +} SCompInfo; +// TODO: take pre-calculation into account typedef struct { - int16_t colId; - int16_t bytes; - int32_t nNullPoints; - int32_t type:8; - int32_t offset:24; - int32_t len; - // fields for pre-aggregate - // TODO: pre-aggregation should be seperated - int64_t sum; - int64_t max; - int64_t min; - int16_t maxIdx; - int16_t minIdx; -} SField; + int16_t colId; // Column ID + int16_t len; // Column length + int32_t type : 8; + int32_t offset : 24; +} SCompCol; + +// TODO: Take recover into account +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + int64_t uid; // For recovery usage + SCompCol cols[]; +} SCompData; const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA - ".last", // TSDB_FILE_TYPE_LAST - ".meta" // TSDB_FILE_TYPE_META + ".last" // TSDB_FILE_TYPE_LAST }; +static int tsdbWriteFileHead(int fd) { + char head[TSDB_FILE_HEAD_SIZE] = "\0"; + + lseek(fd, 0, SEEK_SET); + if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; + + return 0; +} + +static int tsdbWriteHeadFileIdx(int fd, int maxTables) { + int size = sizeof(SCompIdx) * maxTables; + void *buf = calloc(1, size); + if (buf == NULL) return -1; + + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + free(buf); + return NULL; + } + + if (write(fd, buf, size) < 0) { + free(buf); + return -1; + } + + return 0; +} + +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables) { + char fname[128] = "\0"; + sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); + if (access(fname, F_OK) == 0) { + // File already exists + return -1; + } + + int fd = open(fname, O_RDWR | O_CREAT, 0755); + if (fd < 0) return -1; + + if (tsdbWriteFileHead(fd) < 0) { + close(fd); + return -1; + } + + if (type == TSDB_FILE_TYPE_LAST) { + if (tsdbWriteHeadFileIdx(fd, maxTables) < 0) { + close(fd); + return -1; + } + } + + close(fd); + + return 0; +} + +// Create a file group with fileId and return a SFileGroup object +static int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup) { + // tsdbCreateFile() + + return 0; +} + /** * Initialize the TSDB file handle */ @@ -105,7 +172,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 } struct dirent *dp; - char fname[256]; + char fname[256]; while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; if (true /* check if the file is the .head file */) { diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 073321816d..5104c664dc 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -612,9 +612,6 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { rmdir(dirName); - char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META); - remove(metaFname); - return 0; } From 0dde6985dc23a2f4a95a3e8fdccd8e234f4eb7d6 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Thu, 19 Mar 2020 15:21:37 +0800 Subject: [PATCH 02/12] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 23 +++++---- src/vnode/tsdb/src/tsdbFile.c | 75 +++++++++++++++++++----------- src/vnode/tsdb/tests/tsdbTests.cpp | 9 ++++ 3 files changed, 69 insertions(+), 38 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 7e83b84375..6f539cb838 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -24,9 +24,10 @@ extern "C" { #endif typedef enum { - TSDB_FILE_TYPE_HEAD, // .head file type - TSDB_FILE_TYPE_DATA, // .data file type - TSDB_FILE_TYPE_LAST // .last file type + TSDB_FILE_TYPE_HEAD = 0, // .head file type + TSDB_FILE_TYPE_DATA, // .data file type + TSDB_FILE_TYPE_LAST, // .last file type + TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; extern const char *tsdbFileSuffix[]; @@ -37,15 +38,15 @@ typedef struct { } SFileInfo; typedef struct { - int64_t size; // total size of the file - int64_t tombSize; // unused file size + int8_t type; + char fname[128]; + int64_t size; // total size of the file + int64_t tombSize; // unused file size } SFile; typedef struct { int32_t fileId; - SFile fhead; - SFile fdata; - SFile flast; + SFile files[TSDB_FILE_TYPE_MAX]; } SFileGroup; // TSDB file handle @@ -57,14 +58,12 @@ typedef struct { SFileGroup fGroup[]; } STsdbFileH; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_LAST) +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, int32_t maxRowsPerFBlock); void tsdbCloseFile(STsdbFileH *pFileH); - -char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type); - +int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index ad9045567b..da81791da3 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -88,23 +88,26 @@ const char *tsdbFileSuffix[] = { ".last" // TSDB_FILE_TYPE_LAST }; -static int tsdbWriteFileHead(int fd) { +static int tsdbWriteFileHead(int fd, SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; + pFile->size += TSDB_FILE_HEAD_SIZE; + + // TODO: write version and File statistic to the head lseek(fd, 0, SEEK_SET); if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; return 0; } -static int tsdbWriteHeadFileIdx(int fd, int maxTables) { +static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { int size = sizeof(SCompIdx) * maxTables; void *buf = calloc(1, size); if (buf == NULL) return -1; if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { free(buf); - return NULL; + return -1; } if (write(fd, buf, size) < 0) { @@ -112,40 +115,70 @@ static int tsdbWriteHeadFileIdx(int fd, int maxTables) { return -1; } + pFile->size += size; + return 0; } -static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables) { - char fname[128] = "\0"; +static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) { + if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1; + sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]); - if (access(fname, F_OK) == 0) { + + return 0; +} + +/** + * Create a file and set the SFile object + */ +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { + memset((void *)pFile, 0, sizeof(SFile)); + pFile->type = type; + + tsdbGetFileName(dataDir, fileId, type, pFile->fname); + if (access(pFile->fname, F_OK) == 0) { // File already exists return -1; } - int fd = open(fname, O_RDWR | O_CREAT, 0755); + int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); if (fd < 0) return -1; - if (tsdbWriteFileHead(fd) < 0) { - close(fd); - return -1; - } - - if (type == TSDB_FILE_TYPE_LAST) { - if (tsdbWriteHeadFileIdx(fd, maxTables) < 0) { + if (type == TSDB_FILE_TYPE_HEAD) { + if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) { close(fd); return -1; } } + if (tsdbWriteFileHead(fd, pFile) < 0) { + close(fd); + return -1; + } + close(fd); return 0; } +/** + * + */ + // Create a file group with fileId and return a SFileGroup object -static int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup) { - // tsdbCreateFile() +int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { + if (dataDir == NULL || pFGroup == NULL) return -1; + + memset((void *)pFGroup, 0, sizeof(SFileGroup)); + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) { + // TODO: deal with the error here, remove the created files + return -1; + } + } + + pFGroup->fileId = fileId; return 0; } @@ -199,16 +232,6 @@ void tsdbCloseFile(STsdbFileH *pFileH) { // TODO } -char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { - if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL; - - char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5); - if (fileName == NULL) return NULL; - - sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]); - return fileName; -} - static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 46ae3940d2..ed6a3bfcbb 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -3,6 +3,7 @@ #include "tsdb.h" #include "dataformat.h" +#include "tsdbFile.h" #include "tsdbMeta.h" TEST(TsdbTest, tableEncodeDecode) { @@ -106,4 +107,12 @@ TEST(TsdbTest, createRepo) { TEST(TsdbTest, openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); +} + +TEST(TsdbTest, createFileGroup) { + SFileGroup fGroup; + + ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); + + int k = 0; } \ No newline at end of file From 66af9af1ff6413b95dc0ecd46ac6b4de5dabe274 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Thu, 19 Mar 2020 16:33:31 +0800 Subject: [PATCH 03/12] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 7 ++++--- src/vnode/tsdb/src/tsdbFile.c | 16 +++++----------- src/vnode/tsdb/src/tsdbMain.c | 20 +++++++++++++++++--- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 6f539cb838..2f38f1f452 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -61,9 +61,10 @@ typedef struct { #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock); -void tsdbCloseFile(STsdbFileH *pFileH); -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); + int32_t maxRowsPerFBlock, int32_t maxTables); + +void tsdbCloseFile(STsdbFileH *pFileH); +int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index da81791da3..cce5dade4b 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -161,9 +161,10 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return 0; } -/** - * - */ +static int tsdbRemoveFile(SFile *pFile) { + if (pFile == NULL) return -1; + return remove(pFile->fname); +} // Create a file group with fileId and return a SFileGroup object int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { @@ -187,7 +188,7 @@ int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxT * Initialize the TSDB file handle */ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock) { + int32_t maxRowsPerFBlock, int32_t maxTables) { STsdbFileH *pTsdbFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); if (pTsdbFileH == NULL) return NULL; @@ -225,13 +226,6 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 return pTsdbFileH; } -/** - * Closet the file handle - */ -void tsdbCloseFile(STsdbFileH *pFileH) { - // TODO -} - static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 5104c664dc..80a9ae4631 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -58,12 +58,12 @@ typedef struct _tsdb_repo { // The cache Handle STsdbCache *tsdbCache; + // The TSDB file handle + STsdbFileH *tsdbFileH; + // Disk tier handle for multi-tier storage void *diskTier; - // File Store - void *tsdbFiles; - pthread_mutex_t tsdbMutex; // A limiter to monitor the resources used by tsdb @@ -79,6 +79,7 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); +static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -171,6 +172,19 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO } pRepo->tsdbCache = pCache; + // Initialize file handle + char dataDir[128] = "\0"; + tsdbGetDataDirName(pRepo, dataDir); + pRepo->tsdbFileH = + tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); + if (pRepo->tsdbFileH == NULL) { + free(pRepo->rootDir); + tsdbFreeCache(pRepo->tsdbCache); + tsdbFreeMeta(pRepo->tsdbMeta); + free(pRepo); + return NULL; + } + pRepo->state = TSDB_REPO_STATE_ACTIVE; return (tsdb_repo_t *)pRepo; From 72f871f0cd5bfc8d7dbfb091c849246b8539e120 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 20 Mar 2020 12:24:29 +0800 Subject: [PATCH 04/12] TD-34 --- src/vnode/tsdb/inc/tsdbFile.h | 1 + src/vnode/tsdb/src/tsdbFile.c | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 2f38f1f452..89159a06e7 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -55,6 +55,7 @@ typedef struct { int32_t keep; int32_t minRowPerFBlock; int32_t maxRowsPerFBlock; + int32_t maxTables; SFileGroup fGroup[]; } STsdbFileH; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index cce5dade4b..8a7e40cabd 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -197,6 +197,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 pTsdbFileH->keep = keep; pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; + pTsdbFileH->maxTables = maxTables; // Open the directory to read information of each file DIR *dir = opendir(dataDir); @@ -205,8 +206,9 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 return NULL; } + char fname[256]; + struct dirent *dp; - char fname[256]; while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; if (true /* check if the file is the .head file */) { From 4e2c124c012d4759c9867f098392d9e51ac0d461 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 20 Mar 2020 14:38:13 +0800 Subject: [PATCH 05/12] TD-34 --- src/util/inc/tlist.h | 61 +++++++++++++++++++++ src/util/src/tlist.c | 125 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 src/util/inc/tlist.h create mode 100644 src/util/src/tlist.c diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h new file mode 100644 index 0000000000..3f58c35e3b --- /dev/null +++ b/src/util/inc/tlist.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_LIST_ +#define _TD_LIST_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _list_node { + struct _list_node *next; + struct _list_node *prev; + char data[]; +} SListNode; + +typedef struct { + struct _list_node *head; + struct _list_node *tail; + int numOfEles; + int eleSize; +} SList; + +typedef struct { + SListNode *node; +} SListIter; + +#define listHead(l) (l)->head +#define listTail(l) (l)->tail +#define listNEles(l) (l)->numOfEles +#define listEleSize(l) (l)->eleSize +#define isListEmpty(l) ((l)->numOfEles == 0) +#define listNodeFree(n) free(n); + +SList * tdListNew(int eleSize); +void tdListFree(SList *list); +void tdListEmpty(SList *list); +int tdListPrepend(SList *list, void *data); +int tdListAppend(SList *list, void *data); +SListNode *tdListPopHead(SList *list); +SListNode *tdListPopTail(SList *list); +SListNode *tdListPopNode(SList *list, SListNode *node); + +void tdListNodeGetData(SList *list, SListNode *node, void *target); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c new file mode 100644 index 0000000000..42068a717e --- /dev/null +++ b/src/util/src/tlist.c @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include + +#include "tlist.h" + +SList *tdListNew(int eleSize) { + SList *list = (SList *)malloc(sizeof(SList)); + if (list == NULL) return NULL; + + list->eleSize = eleSize; + list->numOfEles = 0; + list->head = list->tail = NULL; + return NULL; +} + +void tdListEmpty(SList *list) { + SListNode *node = list->head; + while (node) { + list->head = node->next; + free(node); + node = list->head; + } + list->head = list->tail = 0; + list->numOfEles = 0; +} + +void tdListFree(SList *list) { + tdListEmpty(list); + free(list); +} + +int tdListPrepend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + + if (list->head == NULL) { + list->head = node; + list->tail = node; + } else { + node->next = list->head; + node->prev = NULL; + list->head->prev = node; + list->head = node; + } + list->numOfEles++; + return 0; +} + +int tdListAppend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + if (list->head == NULL) { + list->head = node; + list->tail = node; + } else { + node->prev = list->tail; + node->next = NULL; + list->tail->next = node; + list->tail = node; + } + + list->numOfEles++; + return 0; +} + +SListNode *tdListPopHead(SList *list) { + if (list->head == NULL) return NULL; + SListNode *node = list->head; + if (node->next == NULL) { + list->head = NULL; + list->tail = NULL; + } else { + list->head = node->next; + } + list->numOfEles--; + return node; +} + +SListNode *tdListPopTail(SList *list) { + if (list->tail == NULL) return NULL; + SListNode *node = list->tail; + if (node->prev == NULL) { + list->head = NULL; + list->tail = NULL; + } else { + list->tail = node->prev; + } + list->numOfEles--; + return node; +} + +SListNode *tdListPopNode(SList *list, SListNode *node) { + if (list->head == node) { + list->head = node->next; + } + if (list->tail == node) { + list->tail = node->prev; + } + + if (node->prev != NULL) { + node->prev->next = node->next; + } + if (node->next != NULL) { + node->next->prev = node->prev; + } + list->numOfEles--; + + return node; +} + +void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(node->data, target, list->eleSize); } \ No newline at end of file From e865255b203f07bd2a15d0de5145ee80eace1933 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 20 Mar 2020 15:56:44 +0800 Subject: [PATCH 06/12] TD-34 --- src/util/inc/tlist.h | 12 ++++++++++-- src/util/src/tlist.c | 23 ++++++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h index 3f58c35e3b..cc99e183ed 100644 --- a/src/util/inc/tlist.h +++ b/src/util/inc/tlist.h @@ -19,6 +19,11 @@ extern "C" { #endif +typedef enum { + TD_LIST_FORWARD, + TD_LIST_BACKWARD +} TD_LIST_DIRECTION_T; + typedef struct _list_node { struct _list_node *next; struct _list_node *prev; @@ -33,7 +38,8 @@ typedef struct { } SList; typedef struct { - SListNode *node; + SListNode * next; + TD_LIST_DIRECTION_T direction; } SListIter; #define listHead(l) (l)->head @@ -52,7 +58,9 @@ SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); SListNode *tdListPopNode(SList *list, SListNode *node); -void tdListNodeGetData(SList *list, SListNode *node, void *target); +void tdListNodeGetData(SList *list, SListNode *node, void *target); +void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction); +SListNode *tdListNext(SListIter *pIter); #ifdef __cplusplus } diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index 42068a717e..a6cb98df5f 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -122,4 +122,25 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { return node; } -void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(node->data, target, list->eleSize); } \ No newline at end of file +void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(node->data, target, list->eleSize); } + +void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) { + pIter->direction = direction; + if (direction == TD_LIST_FORWARD) { + pIter->next = list->head; + } else { + pIter->next = list->tail; + } +} + +SListNode *tdListNext(SListIter *pIter) { + SListNode *node = pIter->next; + if (node == NULL) return NULL; + if (pIter->direction == TD_LIST_FORWARD) { + pIter->next = node->next; + } else { + pIter->next = node->prev; + } + + return node; +} \ No newline at end of file From d5e4fc3201f9f3cba195b4b9b64ef8746940385d Mon Sep 17 00:00:00 2001 From: hzcheng Date: Fri, 20 Mar 2020 23:02:42 +0800 Subject: [PATCH 07/12] TD-34 --- src/util/inc/tlist.h | 7 +-- src/util/src/tlist.c | 57 +++++++++++------- src/vnode/tsdb/inc/tsdbCache.h | 50 +++++++--------- src/vnode/tsdb/src/tsdbCache.c | 103 +++++++++++++++++++++++++++++---- src/vnode/tsdb/src/tsdbMain.c | 4 +- 5 files changed, 154 insertions(+), 67 deletions(-) diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h index cc99e183ed..2ea536ea3e 100644 --- a/src/util/inc/tlist.h +++ b/src/util/inc/tlist.h @@ -19,10 +19,7 @@ extern "C" { #endif -typedef enum { - TD_LIST_FORWARD, - TD_LIST_BACKWARD -} TD_LIST_DIRECTION_T; +typedef enum { TD_LIST_FORWARD, TD_LIST_BACKWARD } TD_LIST_DIRECTION_T; typedef struct _list_node { struct _list_node *next; @@ -52,6 +49,8 @@ typedef struct { SList * tdListNew(int eleSize); void tdListFree(SList *list); void tdListEmpty(SList *list); +void tdListPrependNode(SList *list, SListNode *node); +void tdListAppendNode(SList *list, SListNode *node); int tdListPrepend(SList *list, void *data); int tdListAppend(SList *list, void *data); SListNode *tdListPopHead(SList *list); diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index a6cb98df5f..f11a5481f6 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -43,10 +43,7 @@ void tdListFree(SList *list) { free(list); } -int tdListPrepend(SList *list, void *data) { - SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); - if (node == NULL) return -1; - +void tdListPrependNode(SList *list, SListNode *node) { if (list->head == NULL) { list->head = node; list->tail = node; @@ -57,12 +54,9 @@ int tdListPrepend(SList *list, void *data) { list->head = node; } list->numOfEles++; - return 0; } -int tdListAppend(SList *list, void *data) { - SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); - if (node == NULL) return -1; +void tdListAppendNode(SList *list, SListNode *node) { if (list->head == NULL) { list->head = node; list->tail = node; @@ -74,6 +68,25 @@ int tdListAppend(SList *list, void *data) { } list->numOfEles++; +} + +int tdListPrepend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + + memcpy((void *)(node->data), data, list->eleSize); + tdListPrependNode(list, node); + + return 0; +} + +int tdListAppend(SList *list, void *data) { + SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); + if (node == NULL) return -1; + + memcpy((void *)(node->data), data, list->eleSize); + tdListAppendNode(list, node); + return 0; } @@ -104,22 +117,22 @@ SListNode *tdListPopTail(SList *list) { } SListNode *tdListPopNode(SList *list, SListNode *node) { - if (list->head == node) { - list->head = node->next; - } - if (list->tail == node) { - list->tail = node->prev; - } + if (list->head == node) { + list->head = node->next; + } + if (list->tail == node) { + list->tail = node->prev; + } - if (node->prev != NULL) { - node->prev->next = node->next; - } - if (node->next != NULL) { - node->next->prev = node->prev; - } - list->numOfEles--; + if (node->prev != NULL) { + node->prev->next = node->next; + } + if (node->next != NULL) { + node->next->prev = node->prev; + } + list->numOfEles--; - return node; + return node; } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(node->data, target, list->eleSize); } diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 1821505eae..3bffa1c6a9 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -17,45 +17,39 @@ #include -// #include "cache.h" +#include "tlist.h" #ifdef __cplusplus extern "C" { #endif -#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */ +#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */ typedef struct { - int64_t skey; // start key - int64_t ekey; // end key - int32_t numOfRows; // numOfRows -} STableCacheInfo; + int blockId; + int offset; + int remain; + int padding; + char data[]; +} STsdbCacheBlock; -typedef struct _tsdb_cache_block { - char * pData; - STableCacheInfo * pTableInfo; - struct _tsdb_cache_block *prev; - struct _tsdb_cache_block *next; -} STSDBCacheBlock; +typedef struct { + int64_t index; + SList * memPool; +} STsdbCachePool; -// Use a doublely linked list to implement this -typedef struct STSDBCache { - // Number of blocks the cache is allocated - int32_t numOfBlocks; - STSDBCacheBlock *cacheList; - void * current; +typedef struct { + int maxBytes; + int cacheBlockSize; + STsdbCachePool pool; + STsdbCacheBlock *curBlock; + SList * mem; + SList * imem; } STsdbCache; -// ---- Operation on STSDBCacheBlock -#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) -#define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData)) -#define TSDB_TABLE_INFO_OF_CACHE(pBlock, tableId) ((pBlock)->pTableInfo)[tableId] -#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next) -#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev) - -STsdbCache *tsdbInitCache(int64_t maxSize); -int32_t tsdbFreeCache(STsdbCache *pCache); -void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes); +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); +void tsdbFreeCache(STsdbCache *pCache); +void * tsdbAllocFromCache(STsdbCache *pCache, int bytes); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 165c561b5d..4baedf55a3 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -16,22 +16,103 @@ #include "tsdbCache.h" -STsdbCache *tsdbInitCache(int64_t maxSize) { - STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache)); - if (pCacheHandle == NULL) { - // TODO : deal with the error - return NULL; +static int tsdbAllocBlockFromPool(STsdbCache *pCache); +static void tsdbFreeBlockList(SList *list); + +STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { + STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); + if (pCache == NULL) return NULL; + + pCache->maxBytes = maxBytes; + pCache->cacheBlockSize = cacheBlockSize; + + int nBlocks = maxBytes / cacheBlockSize + 1; + if (nBlocks <= 1) nBlocks = 2; + + STsdbCachePool *pPool = &(pCache->pool); + pPool->index = 0; + pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); + if (pPool->memPool == NULL) goto _err; + + for (int i = 0; i < nBlocks; i++) { + STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); + if (pBlock == NULL) { + goto _err; + } + pBlock->offset = 0; + pBlock->remain = cacheBlockSize; + tdListAppend(pPool->memPool, (void *)(&pBlock)); } - return pCacheHandle; + pCache->mem = tdListNew(sizeof(STsdbCacheBlock *)); + if (pCache->mem == NULL) goto _err; + + pCache->imem = tdListNew(sizeof(STsdbCacheBlock *)); + if (pCache->imem == NULL) goto _err; + + return pCache; + +_err: + tsdbFreeCache(pCache); + return NULL; } -int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } +void tsdbFreeCache(STsdbCache *pCache) { + tsdbFreeBlockList(pCache->imem); + tsdbFreeBlockList(pCache->mem); + tsdbFreeBlockList(pCache->pool.memPool); + free(pCache); +} -void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { - // TODO: implement here - void *ptr = malloc(bytes); - if (ptr == NULL) return NULL; +void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { + if (pCache == NULL) return NULL; + if (bytes > pCache->cacheBlockSize) return NULL; + + if (isListEmpty(pCache->imem)) { + if (tsdbAllocBlockFromPool(pCache) < 0) { + // TODO: deal with the error + } + } + + if (pCache->curBlock->remain < bytes) { + if (tsdbAllocBlockFromPool(pCache) < 0) { + // TODO: deal with the error + } + } + + void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset); + pCache->curBlock->offset += bytes; + pCache->curBlock->remain -= bytes; return ptr; +} + +static void tsdbFreeBlockList(SList *list) { + if (list == NULL) return; + SListNode * node = NULL; + STsdbCacheBlock *pBlock = NULL; + while ((node = tdListPopHead(list)) != NULL) { + tdListNodeGetData(list, node, (void *)(&pBlock)); + free(pBlock); + listNodeFree(node); + } + tdListFree(list); +} + +static int tsdbAllocBlockFromPool(STsdbCache *pCache) { + STsdbCachePool *pPool = &(pCache->pool); + if (listNEles(pPool->memPool) == 0) return -1; + + SListNode *node = tdListPopHead(pPool->memPool); + + STsdbCacheBlock *pBlock = NULL; + tdListNodeGetData(pPool->memPool, node, (void *)(&pBlock)); + pBlock->blockId = pPool->index++; + pBlock->offset = 0; + pBlock->remain = pCache->cacheBlockSize; + + tdListAppendNode(pPool->memPool, node); + pCache->curBlock = pBlock; + + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 80a9ae4631..87cd23eb92 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -163,7 +163,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO pRepo->tsdbMeta = pMeta; // Initialize cache - STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize); + STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1); if (pCache == NULL) { free(pRepo->rootDir); tsdbFreeMeta(pRepo->tsdbMeta); @@ -244,7 +244,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } - pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize); + pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1); if (pRepo->tsdbCache == NULL) { tsdbFreeMeta(pRepo->tsdbMeta); free(pRepo->rootDir); From d148a8e9c382c1b4b616be571258edb27d123737 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 21 Mar 2020 12:13:06 +0800 Subject: [PATCH 08/12] TD-34 --- src/util/src/tlist.c | 4 ++-- src/vnode/tsdb/src/tsdbCache.c | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index f11a5481f6..8844a5f787 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -24,7 +24,7 @@ SList *tdListNew(int eleSize) { list->eleSize = eleSize; list->numOfEles = 0; list->head = list->tail = NULL; - return NULL; + return list; } void tdListEmpty(SList *list) { @@ -135,7 +135,7 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { return node; } -void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(node->data, target, list->eleSize); } +void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) { pIter->direction = direction; diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 4baedf55a3..56ec040daf 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -23,6 +23,8 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); if (pCache == NULL) return NULL; + if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; + pCache->maxBytes = maxBytes; pCache->cacheBlockSize = cacheBlockSize; @@ -83,6 +85,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset); pCache->curBlock->offset += bytes; pCache->curBlock->remain -= bytes; + memset(ptr, 0, bytes); return ptr; } From dca2abd6fbd17e0808e219d2c4c7f02c56d30de7 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 21 Mar 2020 13:36:54 +0800 Subject: [PATCH 09/12] TD-34 --- src/vnode/tsdb/src/tsdbCache.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 56ec040daf..6a0741dced 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -70,7 +70,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { if (pCache == NULL) return NULL; if (bytes > pCache->cacheBlockSize) return NULL; - if (isListEmpty(pCache->imem)) { + if (isListEmpty(pCache->mem)) { if (tsdbAllocBlockFromPool(pCache) < 0) { // TODO: deal with the error } @@ -114,7 +114,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { pBlock->offset = 0; pBlock->remain = pCache->cacheBlockSize; - tdListAppendNode(pPool->memPool, node); + tdListAppendNode(pCache->mem, node); pCache->curBlock = pBlock; return 0; From cfa6e5469ba56de3449a7b6cb6b27d9a5f35de91 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 21 Mar 2020 14:24:27 +0800 Subject: [PATCH 10/12] TD-34 --- src/vnode/tsdb/inc/tsdb.h | 1 + src/vnode/tsdb/inc/tsdbMeta.h | 17 +++++++++-------- src/vnode/tsdb/src/tsdbMain.c | 24 +++++++++++++++++++++++- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index e12f51fd44..4964ac673f 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -58,6 +58,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); +int32_t tsdbTriggerCommit(tsdb_repo_t *repo); // --------- TSDB TABLE DEFINITION typedef struct { diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index b18d16d0d9..38f0818dfb 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -35,20 +35,21 @@ extern "C" { // ---------- TSDB TABLE DEFINITION typedef struct STable { - int8_t type; - STableId tableId; - int32_t superUid; // Super table UID - int32_t sversion; - STSchema * schema; - STSchema * tagSchema; - SDataRow tagVal; + int8_t type; + STableId tableId; + int32_t superUid; // Super table UID + int32_t sversion; + STSchema *schema; + STSchema *tagSchema; + SDataRow tagVal; union { void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index } content; + void * iData; // Skiplist to commit void * eventHandler; // TODO void * streamHandler; // TODO - struct STable *next; // TODO: remove the next + struct STable *next; // TODO: remove the next } STable; void * tsdbEncodeTable(STable *pTable, int *contLen); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 87cd23eb92..34fca7e428 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -64,7 +64,10 @@ typedef struct _tsdb_repo { // Disk tier handle for multi-tier storage void *diskTier; - pthread_mutex_t tsdbMutex; + pthread_mutex_t mutex; + + int commit; + pthread_t commitThread; // A limiter to monitor the resources used by tsdb void *limiter; @@ -80,6 +83,7 @@ static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); +static void * tsdbCommitToFile(void *arg); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -298,6 +302,18 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) { return 0; } +int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + + if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; + if (pRepo->commit) return 0; + pRepo->commit = 1; + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); + pthread_mutex_unlock(&(pRepo->mutex)); + + return 0; +} + /** * Get the TSDB repository information, including some statistics * @param pRepo the TSDB repository handle @@ -673,4 +689,10 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } return 0; +} + +static void *tsdbCommitToFile(void *arg) { + STsdbRepo *pRepo = (STsdbRepo *)arg; + // TODO + return NULL; } \ No newline at end of file From 5808cc499ce3fd81df938aec405236567774cf54 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 21 Mar 2020 15:36:23 +0800 Subject: [PATCH 11/12] TD-34 --- src/util/inc/tlist.h | 1 + src/util/src/tlist.c | 10 ++++++++++ src/vnode/tsdb/src/tsdbMain.c | 29 ++++++++++++++++++++++++++++- src/vnode/tsdb/tests/tsdbTests.cpp | 3 ++- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h index 2ea536ea3e..9e4dfe4580 100644 --- a/src/util/inc/tlist.h +++ b/src/util/inc/tlist.h @@ -56,6 +56,7 @@ int tdListAppend(SList *list, void *data); SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); SListNode *tdListPopNode(SList *list, SListNode *node); +void tdListMove(SList *src, SList *dst); void tdListNodeGetData(SList *list, SListNode *node, void *target); void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction); diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index 8844a5f787..badcb7802f 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -135,6 +135,16 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { return node; } +// Move all node elements from src to dst, the dst is assumed as an empty list +void tdListMove(SList *src, SList *dst) { + // assert(dst->eleSize == src->eleSize); + dst->numOfEles = src->numOfEles; + dst->head = src->head; + dst->tail = src->tail; + src->numOfEles = 0; + src->head = src->tail = NULL; +} + void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) { diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 34fca7e428..ed95eac5bc 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -308,9 +308,23 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1; if (pRepo->commit) return 0; pRepo->commit = 1; + // Loop to move pData to iData + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pRepo->tsdbMeta->tables[i]; + if (pTable != NULL) { + void *pData = pTable->content.pData; + pTable->content.pData = NULL; + pTable->iData = pData; + } + } + // Loop to move mem to imem + tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem); + pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); pthread_mutex_unlock(&(pRepo->mutex)); + pthread_join(pRepo->commitThread, NULL); + return 0; } @@ -692,7 +706,20 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } static void *tsdbCommitToFile(void *arg) { - STsdbRepo *pRepo = (STsdbRepo *)arg; // TODO + STsdbRepo *pRepo = (STsdbRepo *)arg; + STsdbMeta *pMeta = pRepo->tsdbMeta; + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData); + while (tSkipListIterNext(pIter)) { + SSkipListNode *node = tSkipListIterGet(pIter); + SDataRow row = SL_GET_NODE_DATA(node); + int k = 0; + + } + } + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index ed6a3bfcbb..9392ca5963 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -101,7 +101,8 @@ TEST(TsdbTest, createRepo) { tsdbInsertData(pRepo, pMsg); - int k = 0; + tsdbTriggerCommit(pRepo); + } TEST(TsdbTest, openRepo) { From c0578258b32e46aa336b1af69e17387cf4a2fc1b Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 21 Mar 2020 15:52:27 +0800 Subject: [PATCH 12/12] TD-34 --- src/vnode/tsdb/tests/tsdbTests.cpp | 46 ++++++++++++++++-------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 9392ca5963..42a22553c7 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -72,34 +72,36 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 10; - SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows); - - SSubmitBlk *pBlock = pMsg->blocks; - pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; - pBlock->sversion = 0; - pBlock->len = 0; + int nRows = 100; + int rowsPerSubmit = 10; int64_t start_time = 1584081000000; - for (int i = 0; i < nRows; i++) { - int64_t ttime = start_time + 1000 * i; - SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - tdInitDataRow(row, schema); - for (int j = 0; j < schemaNCols(schema); j++) { - if (j == 0) { // Just for timestamp - tdAppendColVal(row, (void *)(&ttime), schemaColAt(schema, j)); - } else { // For int - int val = 10; - tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); + SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + + for (int k = 0; k < nRows/rowsPerSubmit; k++) { + SSubmitBlk *pBlock = pMsg->blocks; + pBlock->tableId = {.uid = 987607499877672L, .tid = 0}; + pBlock->sversion = 0; + pBlock->len = 0; + for (int i = 0; i < rowsPerSubmit; i++) { + start_time += 1000; + SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + tdInitDataRow(row, schema); + + for (int j = 0; j < schemaNCols(schema); j++) { + if (j == 0) { // Just for timestamp + tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); + } else { // For int + int val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); + } } - + pBlock->len += dataRowLen(row); } - pBlock->len += dataRowLen(row); + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + tsdbInsertData(pRepo, pMsg); } - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - - tsdbInsertData(pRepo, pMsg); tsdbTriggerCommit(pRepo);