From 2a754394bfb674b948b791f06c8833cb714818bc Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 11 May 2023 18:16:55 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 26 +- source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h | 5 +- .../dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h | 2 + .../dnode/vnode/src/tsdb/dev/inc/tsdbFile.h | 4 +- .../src/tsdb/dev/inc/tsdbSttFReaderWriter.h | 17 +- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 241 +++++++----------- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 60 ++--- source/dnode/vnode/src/tsdb/dev/tsdbFSet.c | 6 + source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 4 +- .../vnode/src/tsdb/dev/tsdbSttFReaderWriter.c | 110 +++----- 10 files changed, 182 insertions(+), 293 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0ce8438066..d46c043482 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -354,21 +354,19 @@ typedef struct { } SRocksCache; struct STsdb { - char *path; - SVnode *pVnode; - STsdbKeepCfg keepCfg; - TdThreadRwlock rwLock; - SMemTable *mem; - SMemTable *imem; - STsdbFS fs; - SLRUCache *lruCache; - TdThreadMutex lruMutex; - SLRUCache *biCache; - TdThreadMutex biMutex; -#ifdef USE_DEV_CODE + char *path; + SVnode *pVnode; + STsdbKeepCfg keepCfg; + TdThreadRwlock rwLock; + SMemTable *mem; + SMemTable *imem; + STsdbFS fs; + SLRUCache *lruCache; + TdThreadMutex lruMutex; + SLRUCache *biCache; + TdThreadMutex biMutex; struct STFileSystem *pFS; -#endif - SRocksCache rCache; + SRocksCache rCache; }; struct TSDBKEY { diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index 6081b5949c..4f5494fdd8 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -35,9 +35,12 @@ typedef enum { int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback); int32_t tsdbCloseFS(STFileSystem **ppFS); // txn -int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype); +int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid); +int32_t tsdbFSEditBegin(STFileSystem *fs, int64_t eid, const SArray *aFileOp, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *pFS); int32_t tsdbFSEditAbort(STFileSystem *pFS); +// other +int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **ppFSet); /* Exposed Structs */ struct STFileSystem { diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index 7672b8664a..8d3889a098 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -40,6 +40,8 @@ int32_t tsdbFileSetCreate(int32_t fid, STFileSet **ppSet); int32_t tsdbFileSetEdit(STFileSet *pSet, STFileOp *pOp); int32_t tsdbEditFileSet(STFileSet *pFileSet, const STFileOp *pOp); +int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2); + struct STFileOp { tsdb_fop_t op; int32_t fid; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h index 70fbb752af..ed43384d6b 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h @@ -48,8 +48,8 @@ struct STFile { tsdb_ftype_t type; SDiskID did; - int32_t fid; - int64_t cid; + int32_t fid; // file id + int64_t cid; // commit id int64_t size; union { struct { diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index fd063644d0..0c8c0a6557 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -42,6 +42,14 @@ int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBloc int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock); int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock); +struct SSttFileReaderConfig { + STsdb *pTsdb; + SSkmInfo *pSkmTb; + SSkmInfo *pSkmRow; + uint8_t **aBuf; + // TODO +}; + // SSttFWriter ========================================== typedef struct SSttFileWriter SSttFileWriter; typedef struct SSttFileWriterConfig SSttFileWriterConfig; @@ -51,7 +59,6 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFi int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData); -/* ------------------------------------------------- */ struct SSttFileWriterConfig { STsdb *pTsdb; STFile file; @@ -63,14 +70,6 @@ struct SSttFileWriterConfig { uint8_t **aBuf; }; -struct SSttFileReaderConfig { - STsdb *pTsdb; - SSkmInfo *pSkmTb; - SSkmInfo *pSkmRow; - uint8_t **aBuf; - // TODO -}; - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index dccaa0b93c..7315aa277b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -29,14 +29,15 @@ typedef struct { SArray *aTbDataP; // SArray SArray *aFileOp; // SArray + int64_t eid; // edit id // context - TSKEY nextKey; - int32_t fid; - int32_t expLevel; - TSKEY minKey; - TSKEY maxKey; - STFileSet *pFileSet; + TSKEY nextKey; + int32_t fid; + int32_t expLevel; + TSKEY minKey; + TSKEY maxKey; + const STFileSet *pFileSet; // writer SSttFileWriter *pWriter; @@ -44,14 +45,14 @@ typedef struct { static int32_t open_committer_writer(SCommitter *pCommitter) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; + int32_t vid = TD_VID(pTsdb->pVnode); - STsdb *pTsdb = pCommitter->pTsdb; - - SSttFileWriterConfig conf = { + SSttFileWriterConfig config = { .pTsdb = pCommitter->pTsdb, .maxRow = pCommitter->maxRow, - .szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize, + .szPage = pTsdb->pVnode->config.tsdbPageSize, .cmprAlg = pCommitter->cmprAlg, .pSkmTb = NULL, .pSkmRow = NULL, @@ -59,41 +60,39 @@ static int32_t open_committer_writer(SCommitter *pCommitter) { }; if (pCommitter->pFileSet) { - ASSERTS(0, "TODO: Not implemented yet"); + // TODO + ASSERT(0); } else { - conf.file.type = TSDB_FTYPE_STT; + config.file.type = TSDB_FTYPE_STT; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &conf.file.did) < 0) { + if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &config.file.did) < 0) { code = TSDB_CODE_FS_NO_VALID_DISK; TSDB_CHECK_CODE(code, lino, _exit); } - conf.file.size = 0; - conf.file.cid = 1; - conf.file.fid = pCommitter->fid; + config.file.fid = pCommitter->fid; + config.file.cid = pCommitter->eid; + config.file.size = 0; + config.file.stt.lvl = 0; + config.file.stt.nseg = 0; - tsdbTFileInit(pTsdb, &conf.file); + tsdbTFileInit(pTsdb, &config.file); } - code = tsdbSttFWriterOpen(&conf, &pCommitter->pWriter); + code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s, fid:%d", // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code), // - pCommitter->fid); + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", vid, __func__, lino, tstrerror(code), pCommitter->fid); } return code; } static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + int32_t vid = TD_VID(pCommitter->pTsdb->pVnode); if (pCommitter->pWriter == NULL) { code = open_committer_writer(pCommitter); @@ -105,20 +104,10 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB _exit: if (code) { - tsdbError( // - "vgId:%d failed at line %d since %s", // - TD_VID(pCommitter->pTsdb->pVnode), // - lino, // - tstrerror(code)); + tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code)); } else { - tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - pCommitter->fid, // - tbid->suid, // - tbid->uid, // - TSDBROW_KEY(pRow).ts, // - TSDBROW_KEY(pRow).version); + tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, vid, __func__, + pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version); } return 0; } @@ -131,15 +120,14 @@ static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int6 } static int32_t commit_timeseries_data(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino; - + int32_t code = 0; + int32_t lino = 0; int64_t nRow = 0; - SMemTable *pMem = pCommitter->pTsdb->imem; + STsdb *pTsdb = pCommitter->pTsdb; + int32_t vid = TD_VID(pTsdb->pVnode); + SMemTable *pMem = pTsdb->imem; - if (pMem->nRow == 0) { // no time-series data to commit - goto _exit; - } + if (pMem->nRow == 0) goto _exit; TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN}; for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { @@ -165,19 +153,9 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } else { - tsdbDebug( // - "vgId:%d %s done, fid:%d nRow:%" PRId64, // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - pCommitter->fid, // - nRow); + tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, pCommitter->fid, nRow); } return code; } @@ -186,6 +164,8 @@ static int32_t commit_delete_data(SCommitter *pCommitter) { int32_t code = 0; int32_t lino; + return 0; + ASSERTS(0, "TODO: Not implemented yet"); int64_t nDel = 0; @@ -221,29 +201,27 @@ _exit: return code; } -static int32_t start_commit_file_set(SCommitter *pCommitter) { +static int32_t commit_fset_start(SCommitter *pCommitter) { + STsdb *pTsdb = pCommitter->pTsdb; + int32_t vid = TD_VID(pTsdb->pVnode); + pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, &pCommitter->maxKey); - pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); + pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->nextKey = TSKEY_MAX; - pCommitter->pFileSet = NULL; // TODO: need to search the file system + tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->pFileSet); - tsdbDebug( // - "vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - pCommitter->fid, // - pCommitter->minKey, // - pCommitter->maxKey, // - pCommitter->expLevel); + tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid, + pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel); return 0; } -static int32_t end_commit_file_set(SCommitter *pCommitter) { +static int32_t commit_fset_end(SCommitter *pCommitter) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + int32_t vid = TD_VID(pCommitter->pTsdb->pVnode); if (pCommitter->pWriter == NULL) return 0; @@ -253,60 +231,43 @@ static int32_t end_commit_file_set(SCommitter *pCommitter) { TSDB_CHECK_CODE(code, lino, _exit); } - TSDB_CHECK_CODE( // - code = tsdbSttFWriterClose( // - &pCommitter->pWriter, // - 0, // - pFileOp), // - lino, // - _exit); + code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError( // - "vgId:%d failed at line %d since %s", // - TD_VID(pCommitter->pTsdb->pVnode), // - lino, // - tstrerror(code)); + tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code)); } else { - tsdbDebug( // - "vgId:%d %s done, fid:%d", // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - pCommitter->fid); + tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->fid); } return code; } -static int32_t commit_next_file_set(SCommitter *pCommitter) { +static int32_t commit_fset(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; + int32_t vid = TD_VID(pCommitter->pTsdb->pVnode); // fset commit start - code = start_commit_file_set(pCommitter); + code = commit_fset_start(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); // commit fset code = commit_timeseries_data(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); - /* TODO code = commit_delete_data(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); - */ // fset commit end - code = end_commit_file_set(pCommitter); + code = commit_fset_end(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pCommitter->pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", vid, __func__); } return code; } @@ -332,6 +293,7 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom taosArrayDestroy(pCommitter->aFileOp); TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } + tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid); // start loop pCommitter->nextKey = pTsdb->imem->minKey; // TODO @@ -347,35 +309,27 @@ _exit: static int32_t close_committer(SCommitter *pCommiter, int32_t eno) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + int32_t vid = TD_VID(pCommiter->pTsdb->pVnode); if (eno == 0) { - TSDB_CHECK_CODE( // - code = tsdbFSEditBegin( // - pCommiter->pTsdb->pFS, // - pCommiter->aFileOp, // - TSDB_FEDIT_COMMIT), - lino, // - _exit); + code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, pCommiter->eid, pCommiter->aFileOp, TSDB_FEDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); } else { - ASSERTS(0, "TODO: Not implemented yet"); + // TODO + ASSERT(0); } - // TODO: clear the committer + ASSERT(pCommiter->pWriter == NULL); + taosArrayDestroy(pCommiter->aTbDataP); + taosArrayDestroy(pCommiter->aFileOp); _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pCommiter->pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code), + pCommiter->eid); } else { - tsdbDebug( // - "vgId:%d %s done", // - TD_VID(pCommiter->pTsdb->pVnode), // - __func__); + tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid); } return code; } @@ -408,8 +362,11 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); while (committer.nextKey != TSKEY_MAX) { - code = commit_next_file_set(&committer); - if (code) break; + code = commit_fset(&committer); + if (code) { + lino = __LINE__; + break; + } } code = close_committer(&committer, code); @@ -427,37 +384,28 @@ _exit: } int32_t tsdbCommitCommit(STsdb *pTsdb) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(pTsdb->pVnode); + + if (pTsdb->imem == NULL) goto _exit; + SMemTable *pMemTable = pTsdb->imem; - - // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSEditCommit(pTsdb->pFS); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } - pTsdb->imem = NULL; - - // unlock taosThreadRwlockUnlock(&pTsdb->rwLock); - if (pMemTable) { - tsdbUnrefMemTable(pMemTable, NULL, true); - } + tsdbUnrefMemTable(pMemTable, NULL, true); _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", // - TD_VID(pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s done", // - TD_VID(pTsdb->pVnode), __func__); + tsdbInfo("vgId:%d %s done", vid, __func__); } return code; } @@ -465,21 +413,18 @@ _exit: int32_t tsdbCommitAbort(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; + int32_t vid = TD_VID(pTsdb->pVnode); + + if (pTsdb->imem == NULL) goto _exit; code = tsdbFSEditAbort(pTsdb->pFS); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", // - TD_VID(pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s done", // - TD_VID(pTsdb->pVnode), // - __func__); + tsdbInfo("vgId:%d %s done", vid, __func__); } return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index 2bef56e3d4..bfc522b040 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -435,51 +435,18 @@ static int32_t edit_fs(STFileSystem *pFS, const SArray *aFileOp) { int32_t code = 0; int32_t lino; - taosArrayClearEx(pFS->nstate, NULL /* TODO */); - - // TODO: copy current state to new state - + STFileSet *pSet = NULL; for (int32_t iop = 0; iop < taosArrayGetSize(aFileOp); iop++) { - struct STFileOp *pOp = taosArrayGet(aFileOp, iop); + struct STFileOp *op = taosArrayGet(aFileOp, iop); - struct STFileSet tmpSet = {.fid = pOp->fid}; - - int32_t idx = taosArraySearchIdx( // - pFS->nstate, // - &tmpSet, // - (__compar_fn_t)fset_cmpr_fn, // - TD_GE); - - struct STFileSet *pSet; - if (idx < 0) { - pSet = NULL; - idx = taosArrayGetSize(pFS->nstate); - } else { - pSet = taosArrayGet(pFS->nstate, idx); + if (pSet == NULL || pSet->fid != op->fid) { + STFileSet fset = {.fid = op->fid}; + pSet = taosArraySearch(pFS->nstate, &fset, (__compar_fn_t)tsdbFSetCmprFn, TD_EQ); } - if (pSet == NULL || pSet->fid != pOp->fid) { - ASSERTS(pOp->op == TSDB_FOP_CREATE, "BUG: Invalid file operation"); - TSDB_CHECK_CODE( // - code = tsdbFileSetCreate(pOp->fid, &pSet), // - lino, // - _exit); - - if (taosArrayInsert(pFS->nstate, idx, pSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // do opration on file set - TSDB_CHECK_CODE( // - code = tsdbFileSetEdit(pSet, pOp), // - lino, // - _exit); + // TODO } - // TODO: write new state to file - _exit: return 0; } @@ -511,7 +478,12 @@ int32_t tsdbCloseFS(STFileSystem **ppFS) { return 0; } -int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) { +int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) { + eid[0] = ++pFS->neid; // TODO: use atomic operation + return 0; +} + +int32_t tsdbFSEditBegin(STFileSystem *fs, int64_t eid, const SArray *aFileOp, EFEditT etype) { int32_t code = 0; int32_t lino; char current_t[TSDB_FILENAME_LEN]; @@ -525,7 +497,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) tsem_wait(&fs->canEdit); fs->etype = etype; - fs->eid = ++fs->neid; + fs->eid = eid; // edit code = edit_fs(fs, aFileOp); @@ -555,4 +527,10 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) { int32_t code = abort_edit(fs); tsem_post(&fs->canEdit); return code; +} + +int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **ppFSet) { + STFileSet fset = {.fid = fid}; + ppFSet[0] = taosArraySearch(fs->cstate, &fset, (__compar_fn_t)tsdbFSetCmprFn, TD_EQ); + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index 1d0d1bff07..3f909c0609 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -131,4 +131,10 @@ int32_t tsdbEditFileSet(struct STFileSet *pFileSet, const struct STFileOp *pOp) ASSERTS(0, "TODO: Not implemented yet"); // TODO return code; +} + +int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2) { + if (pSet1->fid < pSet2->fid) return -1; + if (pSet1->fid > pSet2->fid) return 1; + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index deaaea31e3..ced728c4ed 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -69,8 +69,8 @@ static int32_t tsdbCloseMerger(SMerger *pMerger) { STsdb *pTsdb = pMerger->pTsdb; - code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE); - TSDB_CHECK_CODE(code, lino, _exit) + // code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE); + // TSDB_CHECK_CODE(code, lino, _exit) _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index be59354a76..31c4d29139 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -470,27 +470,20 @@ static int32_t destroy_stt_fwriter(SSttFileWriter *pWriter) { static int32_t open_stt_fwriter(SSttFileWriter *pWriter) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + int32_t vid = TD_VID(pWriter->config.pTsdb->pVnode); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; int32_t flag = TD_FILE_READ | TD_FILE_WRITE; if (pWriter->tFile.size == 0) { - flag |= TD_FILE_CREATE | TD_FILE_TRUNC; + flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); } - code = tsdbOpenFile( // - pWriter->config.file.fname, // - pWriter->config.szPage, // - flag, // - &pWriter->pFd); + code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->tFile.size == 0) { - code = tsdbWriteFile( // - pWriter->pFd, // - 0, // - hdr, // - sizeof(hdr)); + code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr)); TSDB_CHECK_CODE(code, lino, _exit); pWriter->tFile.size += sizeof(hdr); @@ -498,23 +491,11 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) { _exit: if (code) { - if (pWriter->pFd) { - tsdbCloseFile(&pWriter->pFd); - } - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd); + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } else { - tsdbDebug( // - "vgId:%d %s done, fname:%s size:%" PRId64, // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - pWriter->config.file.fname, // - pWriter->config.file.size // - ); + tsdbDebug("vgId:%d %s done, fname:%s size:%" PRId64, vid, __func__, pWriter->config.file.fname, + pWriter->config.file.size); } return code; } @@ -526,7 +507,8 @@ static int32_t close_stt_fwriter(SSttFileWriter *pWriter) { int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; + int32_t vid = TD_VID(pConf->pTsdb->pVnode); code = create_stt_fwriter(pConf, ppWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -536,15 +518,11 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **p _exit: if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); if (ppWriter[0]) { destroy_stt_fwriter(ppWriter[0]); + ppWriter[0] = NULL; } - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pConf->pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); } return code; } @@ -552,59 +530,41 @@ _exit: int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op) { int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode); int32_t code = 0; - int32_t lino; + int32_t lino = 0; if (!abort) { if (ppWriter[0]->bData.nRow > 0) { - TSDB_CHECK_CODE( // - code = write_timeseries_block(ppWriter[0]), // - lino, // - _exit); + code = write_timeseries_block(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); } if (ppWriter[0]->sData.nRow > 0) { - TSDB_CHECK_CODE( // - code = write_statistics_block(ppWriter[0]), // - lino, // - _exit); + code = write_statistics_block(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); } if (ppWriter[0]->dData.nRow > 0) { - TSDB_CHECK_CODE( // - code = write_delete_block(ppWriter[0]), // - lino, // - _exit); + code = write_delete_block(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); } - TSDB_CHECK_CODE( // - code = write_stt_blk(ppWriter[0]), // - lino, // - _exit); + code = write_stt_blk(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_CODE( // - code = write_statistics_blk(ppWriter[0]), // - lino, // - _exit); + code = write_statistics_blk(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_CODE( // - code = write_del_blk(ppWriter[0]), // - lino, // - _exit); + code = write_del_blk(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_CODE( // - code = write_file_footer(ppWriter[0]), // - lino, // - _exit); + code = write_file_footer(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_CODE( // - code = write_file_header(ppWriter[0]), // - lino, // - _exit); + code = write_file_header(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_CODE( // - code = tsdbFsyncFile(ppWriter[0]->pFd), // - lino, // - _exit); + code = tsdbFsyncFile(ppWriter[0]->pFd); + TSDB_CHECK_CODE(code, lino, _exit); if (op) { op->fid = ppWriter[0]->config.file.fid; @@ -618,10 +578,8 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFi } } - TSDB_CHECK_CODE( // - code = close_stt_fwriter(ppWriter[0]), // - lino, // - _exit); + code = close_stt_fwriter(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); destroy_stt_fwriter(ppWriter[0]); ppWriter[0] = NULL;