From c94331fc6bf3232a20471fa9a53b9dbb19342e36 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Mon, 30 Mar 2020 13:29:36 +0800 Subject: [PATCH 1/3] TD-34 --- src/vnode/tsdb/src/tsdbMain.c | 23 ++++++++++++++++++++++- src/vnode/tsdb/tests/tsdbTests.cpp | 11 ++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index c45a8407cc..0a681a8237 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -287,8 +287,29 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) { if (pRepo == NULL) return 0; pRepo->state = TSDB_REPO_STATE_CLOSED; + tsdbLockRepo(repo); + if (pRepo->commit) { + tsdbUnLockRepo(repo); + return -1; + } + pRepo->commit = 1; + // Loop to move pData to iData + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pRepo->tsdbMeta->tables[i]; + if (pTable != NULL && pTable->mem != NULL) { + pTable->imem = pTable->mem; + pTable->mem = NULL; + } + } + // TODO: Loop to move mem to imem + pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; + pRepo->tsdbCache->mem = NULL; + pRepo->tsdbCache->curBlock = NULL; + tsdbUnLockRepo(repo); - tsdbFlushCache(pRepo); + tsdbCommitData((void *)repo); + + tsdbCloseFileH(pRepo->tsdbFileH); tsdbFreeMeta(pRepo->tsdbMeta); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index bc6532984f..73caeb6700 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -78,7 +78,7 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 10000000; + int nRows = 1000000; int rowsPerSubmit = 10; int64_t start_time = 1584081000000; @@ -87,6 +87,7 @@ TEST(TsdbTest, createRepo) { double stime = getCurTime(); for (int k = 0; k < nRows/rowsPerSubmit; k++) { + memset((void *)pMsg, 0, sizeof(SSubmitMsg)); SSubmitBlk *pBlock = pMsg->blocks; pBlock->uid = 987607499877672L; pBlock->tid = 0; @@ -108,6 +109,9 @@ TEST(TsdbTest, createRepo) { } pBlock->len += dataRowLen(row); } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->numOfBlocks = 1; + pBlock->len = htonl(pBlock->len); pBlock->numOfRows = htonl(pBlock->numOfRows); pBlock->uid = htobe64(pBlock->uid); @@ -116,7 +120,6 @@ TEST(TsdbTest, createRepo) { pBlock->sversion = htonl(pBlock->sversion); pBlock->padding = htonl(pBlock->padding); - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->compressed = htonl(pMsg->numOfBlocks); @@ -128,9 +131,7 @@ TEST(TsdbTest, createRepo) { printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - - - // tsdbTriggerCommit(pRepo); + tsdbCloseRepo(pRepo); } From 5b10eda69880bfc8272a97f2f2257db571c85a43 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Mon, 30 Mar 2020 14:26:38 +0800 Subject: [PATCH 2/3] TD-34 --- src/vnode/tsdb/src/tsdbFile.c | 36 ++++++++++++++++++++++++++++-- src/vnode/tsdb/src/tsdbMain.c | 11 +++++++++ src/vnode/tsdb/tests/tsdbTests.cpp | 11 ++++++--- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 5240a99a37..bfdd998c22 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -35,6 +35,7 @@ static int compFGroup(const void *arg1, const void *arg2); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); +static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -50,10 +51,17 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { return NULL; } - struct dirent *dp; + struct dirent *dp = NULL; + int fid = 0; + SFileGroup fGroup = {0}; while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; - // TODO + int fid = 0; + sscanf(dp->d_name, "f%d", &fid); + if (tsdbOpenFGroup(pFileH, dataDir, fid) < 0) { + break; + // TODO + } } return pFileH; @@ -61,6 +69,30 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } +static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) { + tsdbGetFileName(dataDir, fid, suffix, pFile->fname); + if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; + pFile->fd = -1; + // TODO: recover the file info + // pFile->info = {0}; + return 0; +} + +static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { + if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; + + char fname[128] = "\0"; + SFileGroup fGroup = {0}; + fGroup.fileId = fid; + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1; + } + pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; + qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + return 0; +} + int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 0a681a8237..97c98efa05 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -237,6 +237,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { * @return a TSDB repository handle on success, NULL for failure and the error number is set */ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { + char dataDir[128] = "\0"; if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { return NULL; } @@ -265,6 +266,16 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } + tsdbGetDataDirName(pRepo, dataDir); + pRepo->tsdbFileH = tsdbInitFileH(dataDir, pRepo->config.maxTables); + if (pRepo->tsdbFileH == NULL) { + tsdbFreeCache(pRepo->tsdbCache); + tsdbFreeMeta(pRepo->tsdbMeta); + free(pRepo->rootDir); + free(pRepo); + return NULL; + } + pRepo->state = TSDB_REPO_STATE_ACTIVE; return (tsdb_repo_t *)pRepo; diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 73caeb6700..12311db68e 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -49,7 +49,8 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } -TEST(TsdbTest, createRepo) { +TEST(TsdbTest, DISABLED_createRepo) { +// TEST(TsdbTest, createRepo) { STsdbCfg config; // 1. Create a tsdb repository @@ -78,7 +79,7 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 1000000; + int nRows = 10000000; int rowsPerSubmit = 10; int64_t start_time = 1584081000000; @@ -129,13 +130,17 @@ TEST(TsdbTest, createRepo) { double etime = getCurTime(); + void *ptr = malloc(150000); + free(ptr); + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); tsdbCloseRepo(pRepo); } -TEST(TsdbTest, DISABLED_openRepo) { +// TEST(TsdbTest, DISABLED_openRepo) { +TEST(TsdbTest, openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); } From d8c7cdf1c9ed2b7cc407596d0ab10231632578aa Mon Sep 17 00:00:00 2001 From: hzcheng Date: Mon, 30 Mar 2020 16:44:56 +0800 Subject: [PATCH 3/3] TD-34 --- src/util/src/tskiplist.c | 2 +- src/vnode/tsdb/inc/tsdbFile.h | 13 +++++++++ src/vnode/tsdb/src/tsdbFile.c | 46 ++++++++++++++++++++++++++++++ src/vnode/tsdb/tests/tsdbTests.cpp | 4 +-- 4 files changed, 62 insertions(+), 3 deletions(-) diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 1760919b05..77f643a2c7 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -472,7 +472,7 @@ void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t le SL_GET_FORWARD_POINTER(x, i) = pNode; } else { SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; - SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); + // SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); } } } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 0c85c5ef46..6c42d4aa15 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -82,6 +82,19 @@ int tsdbOpenFile(SFile *pFile, int oflag); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); +#define TSDB_FGROUP_ITER_FORWARD 0 +#define TSDB_FGROUP_ITER_BACKWARD 1 +typedef struct { + int numOfFGroups; + SFileGroup *base; + SFileGroup *pFileGroup; + int direction; +} SFileGroupIter; + +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction); +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid); +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter); + typedef struct { int32_t len; int32_t offset; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index bfdd998c22..d6964112e7 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -22,6 +22,7 @@ #include #include +#include "tutil.h" #include "tsdbFile.h" const char *tsdbFileSuffix[] = { @@ -133,6 +134,51 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { + pIter->direction = direction; + pIter->base = pFileH->fGroup; + pIter->numOfFGroups = pFileH->numOfFGroups; + if (pFileH->numOfFGroups == 0){ + pIter->pFileGroup = NULL; + } else { + if (direction == TSDB_FGROUP_ITER_FORWARD) { + pIter->pFileGroup = pFileH->fGroup; + } else { + pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1; + } + } +} + +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; + void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); + if (ptr == NULL) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup = (SFileGroup *)ptr; + } +} + +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { + SFileGroup *ret = pIter->pFileGroup; + if (ret == NULL) return NULL; + + if (pIter->direction = TSDB_FGROUP_ITER_FORWARD) { + if (pIter->pFileGroup + 1 == pIter->base + pIter->numOfFGroups) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup += 1; + } + } else { + if (pIter->pFileGroup - 1 == pIter->base) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup -= 1; + } + } + return ret; +} + int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { SCompBlock *pBlock = pStartBlock; for (int i = 0; i < numOfBlocks; i++) { diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 12311db68e..6cfe0e626d 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -49,8 +49,8 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } -TEST(TsdbTest, DISABLED_createRepo) { -// TEST(TsdbTest, createRepo) { +// TEST(TsdbTest, DISABLED_createRepo) { +TEST(TsdbTest, createRepo) { STsdbCfg config; // 1. Create a tsdb repository