diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index 8d51b6a16e..093ed39a99 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -39,7 +39,7 @@ int32_t tsdbCloseFS(STFileSystem **fs); int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr); // txn -int32_t tsdbFSAllocEid(STFileSystem *fs, int64_t *eid); +int64_t tsdbFSAllocEid(STFileSystem *fs); int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index acb83deece..a0811d8207 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -52,7 +52,7 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f // max commit id int64_t tsdbTFileSetMaxCid(const STFileSet *fset); // get -SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level); +SSttLvl *tsdbTFileSetGetSttLvl(STFileSet *fset, int32_t level); // is empty bool tsdbTFileSetIsEmpty(const STFileSet *fset); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h index 6e17c386b6..e3b3d45ba1 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h @@ -53,8 +53,8 @@ int32_t tsdbIterClose(STsdbIter **iter); int32_t tsdbIterNext(STsdbIter *iter); // SIterMerger =============== -int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger); -int32_t tsdbIterMergerClear(SIterMerger **merger); +int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger); +int32_t tsdbIterMergerClose(SIterMerger **merger); int32_t tsdbIterMergerNext(SIterMerger *merger); SRowInfo *tsdbIterMergerGet(SIterMerger *merger); int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index d104af5875..594aca9a67 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -52,8 +52,6 @@ struct SSttFileReaderConfig { STsdb *tsdb; int32_t szPage; STFile file[1]; - SSkmInfo *skmTb; - SSkmInfo *skmRow; uint8_t **bufArr; }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 4de9337eb8..8bb034c265 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -17,16 +17,20 @@ // extern dependencies typedef struct { - STsdb *tsdb; + STsdb *tsdb; + TFileSetArray *fsetArr; + int32_t minutes; int8_t precision; int32_t minRow; int32_t maxRow; int8_t cmprAlg; - int8_t sttTrigger; + int32_t sttTrigger; + int32_t szPage; int64_t compactVersion; struct { + int64_t cid; int64_t now; TSKEY nextKey; int32_t fid; @@ -37,111 +41,115 @@ typedef struct { TABLEID tbid[1]; } ctx[1]; - int64_t eid; // edit id TFileOpArray fopArray[1]; TTsdbIterArray iterArray[1]; SIterMerger *iterMerger; // writer - SDataFileWriter *dataWriter; SSttFileWriter *sttWriter; + SDataFileWriter *dataWriter; } SCommitter2; -static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *pCommitter) { +static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - STsdb *pTsdb = pCommitter->tsdb; - SVnode *pVnode = pTsdb->pVnode; - int32_t vid = TD_VID(pVnode); - SSttFileWriterConfig config[1]; - SDiskID did[1]; - - if (tfsAllocDisk(pVnode->pTfs, pCommitter->ctx->expLevel, did) < 0) { + SDiskID did[1]; + if (tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, did) < 0) { code = TSDB_CODE_FS_NO_VALID_DISK; TSDB_CHECK_CODE(code, lino, _exit); } - config->tsdb = pTsdb; - config->maxRow = pCommitter->maxRow; - config->szPage = pVnode->config.tsdbPageSize; - config->cmprAlg = pCommitter->cmprAlg; - config->skmTb = NULL; - config->skmRow = NULL; - config->aBuf = NULL; - config->file.type = TSDB_FTYPE_STT; - config->file.did = did[0]; - config->file.fid = pCommitter->ctx->fid; - config->file.cid = pCommitter->eid; - config->file.size = 0; - config->file.stt->level = 0; - config->file.stt->nseg = 0; + SSttFileWriterConfig config[1] = {{ + .tsdb = committer->tsdb, + .maxRow = committer->maxRow, + .szPage = committer->tsdb->pVnode->config.tsdbPageSize, + .cmprAlg = committer->cmprAlg, + .compactVersion = committer->compactVersion, + .file = + { + .type = TSDB_FTYPE_STT, + .did = did[0], + .fid = committer->ctx->fid, + .cid = committer->ctx->cid, + }, + }}; - code = tsdbSttFileWriterOpen(config, &pCommitter->sttWriter); + code = tsdbSttFileWriterOpen(config, &committer->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s success", vid, __func__); + tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__); } return code; } -static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *pCommitter, const STFile *pFile) { + +static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *committer, const STFile *f) { int32_t code = 0; int32_t lino = 0; - STsdb *pTsdb = pCommitter->tsdb; - SVnode *pVnode = pTsdb->pVnode; - int32_t vid = TD_VID(pVnode); - SSttFileWriterConfig config = { - // - .tsdb = pTsdb, - .maxRow = pCommitter->maxRow, - .szPage = pVnode->config.tsdbPageSize, - .cmprAlg = pCommitter->cmprAlg, - .skmTb = NULL, - .skmRow = NULL, - .aBuf = NULL, - .file = *pFile // - }; + SSttFileWriterConfig config[1] = {{ + .tsdb = committer->tsdb, + .maxRow = committer->maxRow, + .szPage = committer->szPage, + .cmprAlg = committer->cmprAlg, + .compactVersion = committer->compactVersion, + .file = f[0], + }}; - code = tsdbSttFileWriterOpen(&config, &pCommitter->sttWriter); + code = tsdbSttFileWriterOpen(config, &committer->sttWriter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s success", vid, __func__); + tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__); } return code; } + static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { + int32_t code = 0; + int32_t lino = 0; + + // stt writer if (!committer->ctx->fset) { - return tsdbCommitOpenNewSttWriter(committer); + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); } - const SSttLvl *lvl0 = tsdbTFileSetGetLvl(committer->ctx->fset, 0); - if (lvl0 == NULL) { - return tsdbCommitOpenNewSttWriter(committer); + const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0); + if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) { + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(TARRAY2_SIZE(lvl0->fobjArr) > 0); STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); if (fobj->f->stt->nseg >= committer->sttTrigger) { - return tsdbCommitOpenNewSttWriter(committer); + code = tsdbCommitOpenNewSttWriter(committer); + TSDB_CHECK_CODE(code, lino, _exit); } else { - return tsdbCommitOpenExistSttWriter(committer, fobj->f); + code = tsdbCommitOpenExistSttWriter(committer, fobj->f); + TSDB_CHECK_CODE(code, lino, _exit); } + + // data writer + if (0) { + // TODO + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); + } + return code; } -static int32_t tsdbCommitTSRow(SCommitter2 *committer, SRowInfo *row) { - return tsdbSttFileWriteTSData(committer->sttWriter, row); -} - -static int32_t tsdbCommitWriteDelData(SCommitter2 *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey, +static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int64_t uid, int64_t version, int64_t sKey, int64_t eKey) { int32_t code = 0; // TODO @@ -174,7 +182,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { code = TARRAY2_APPEND(committer->iterArray, iter); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerInit(committer->iterArray, &committer->iterMerger); + code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger); TSDB_CHECK_CODE(code, lino, _exit); // loop iter @@ -183,7 +191,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { committer->ctx->tbid->suid = row->suid; committer->ctx->tbid->uid = row->uid; - // Ignore deleted table + // Ignore table of obsolescence SMetaInfo info[1]; if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, info, NULL) != 0) { code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); @@ -195,11 +203,10 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { TSKEY ts = TSDBROW_TS(&row->row); if (ts > committer->ctx->maxKey) { committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); - code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid); TSDB_CHECK_CODE(code, lino, _exit); } else { - code = tsdbCommitTSRow(committer, row); + code = tsdbSttFileWriteTSData(committer->sttWriter, row); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbIterMergerNext(committer->iterMerger); @@ -216,7 +223,7 @@ _exit: return code; } -static int32_t tsdbCommitDelData(SCommitter2 *pCommitter) { +static int32_t tsdbCommitDelData(SCommitter2 *committer) { int32_t code = 0; int32_t lino; @@ -226,23 +233,23 @@ static int32_t tsdbCommitDelData(SCommitter2 *pCommitter) { ASSERTS(0, "TODO: Not implemented yet"); int64_t nDel = 0; - SMemTable *pMem = pCommitter->tsdb->imem; + SMemTable *pMem = committer->tsdb->imem; if (pMem->nDel == 0) { // no del data goto _exit; } - for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); + for (int32_t iTbData = 0; iTbData < taosArrayGetSize(committer->aTbDataP); iTbData++) { + STbData *pTbData = (STbData *)taosArrayGetP(committer->aTbDataP, iTbData); for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { - if (pDelData->eKey < pCommitter->ctx->minKey) continue; - if (pDelData->sKey > pCommitter->ctx->maxKey) { - pCommitter->ctx->nextKey = TMIN(pCommitter->ctx->nextKey, pDelData->sKey); + if (pDelData->eKey < committer->ctx->minKey) continue; + if (pDelData->sKey > committer->ctx->maxKey) { + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, pDelData->sKey); continue; } - code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version, + code = tsdbCommitWriteDelData(committer, pTbData->suid, pTbData->uid, pDelData->version, pDelData->sKey /* TODO */, pDelData->eKey /* TODO */); TSDB_CHECK_CODE(code, lino, _exit); } @@ -250,9 +257,9 @@ static int32_t tsdbCommitDelData(SCommitter2 *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->tsdb->pVnode), lino, tstrerror(code)); + tsdbError("vgId:%d failed at line %d since %s", TD_VID(committer->tsdb->pVnode), lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->tsdb->pVnode), __func__, pCommitter->ctx->fid, + tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid, pMem->nDel); } return code; @@ -266,17 +273,26 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t vid = TD_VID(tsdb->pVnode); committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); + committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, &committer->ctx->maxKey); - committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); - committer->ctx->nextKey = TSKEY_MAX; + STFileSet fset = {.fid = committer->ctx->fid}; + committer->ctx->fset = &fset; + committer->ctx->fset = TARRAY2_SEARCH_EX(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ); + committer->ctx->tbid->suid = 0; + committer->ctx->tbid->uid = 0; - // TODO: use a thread safe function to get fset - tsdbFSGetFSet(tsdb->pFS, committer->ctx->fid, &committer->ctx->fset); + ASSERT(TARRAY2_SIZE(committer->iterArray) == 0); + ASSERT(committer->iterMerger == NULL); + ASSERT(committer->sttWriter == NULL); + ASSERT(committer->dataWriter == NULL); code = tsdbCommitOpenWriter(committer); TSDB_CHECK_CODE(code, lino, _exit); + // reset nextKey + committer->ctx->nextKey = TSKEY_MAX; + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -287,27 +303,27 @@ _exit: return 0; } -static int32_t tsdbCommitFileSetEnd(SCommitter2 *pCommitter) { +static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(pCommitter->tsdb->pVnode); - if (pCommitter->sttWriter == NULL) return 0; - - STFileOp op; - code = tsdbSttFileWriterClose(&pCommitter->sttWriter, 0, &op); + STFileOp op[1]; + code = tsdbSttFileWriterClose(&committer->sttWriter, 0, op); TSDB_CHECK_CODE(code, lino, _exit); - if (op.optype != TSDB_FOP_NONE) { - code = TARRAY2_APPEND(pCommitter->fopArray, op); + if (op->optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND_PTR(committer->fopArray, op); TSDB_CHECK_CODE(code, lino, _exit); } + tsdbIterMergerClose(&committer->iterMerger); + TARRAY2_CLEAR(committer->iterArray, tsdbIterClose); + _exit: if (code) { - tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->ctx->fid); + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid); } return code; } @@ -349,20 +365,24 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co memset(committer, 0, sizeof(committer[0])); committer->tsdb = tsdb; + code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + committer->minutes = tsdb->keepCfg.days; committer->precision = tsdb->keepCfg.precision; committer->minRow = info->info.config.tsdbCfg.minRows; committer->maxRow = info->info.config.tsdbCfg.maxRows; committer->cmprAlg = info->info.config.tsdbCfg.compression; committer->sttTrigger = info->info.config.sttTrigger; - committer->compactVersion = INT64_MAX; // TODO: use a function - - TARRAY2_INIT(committer->fopArray); - tsdbFSAllocEid(tsdb->pFS, &committer->eid); + committer->szPage = info->info.config.tsdbPageSize; + committer->compactVersion = INT64_MAX; + committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS); committer->ctx->now = taosGetTimestampSec(); committer->ctx->nextKey = tsdb->imem->minKey; // TODO + TARRAY2_INIT(committer->fopArray); + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code); @@ -372,28 +392,32 @@ _exit: return code; } -static int32_t tsdbCloseCommitter(SCommitter2 *pCommiter, int32_t eno) { +static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(pCommiter->tsdb->pVnode); + int32_t vid = TD_VID(committer->tsdb->pVnode); if (eno == 0) { - code = tsdbFSEditBegin(pCommiter->tsdb->pFS, pCommiter->fopArray, TSDB_FEDIT_COMMIT); + code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT); TSDB_CHECK_CODE(code, lino, _exit); } else { // TODO ASSERT(0); } - ASSERT(pCommiter->sttWriter == NULL); - TARRAY2_FREE(pCommiter->fopArray); + ASSERT(committer->dataWriter == NULL); + ASSERT(committer->sttWriter == NULL); + ASSERT(committer->iterMerger == NULL); + TARRAY2_FREE(committer->iterArray); + TARRAY2_FREE(committer->fopArray); + tsdbFSDestroyCopySnapshot(&committer->fsetArr); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code), - pCommiter->eid); + committer->ctx->cid); } else { - tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid); + tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, committer->ctx->cid); } return code; } @@ -413,15 +437,15 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(tsdb->pVnode); - SMemTable *memt = tsdb->imem; - int64_t nRow = memt->nRow; - int64_t nDel = memt->nDel; + SMemTable *imem = tsdb->imem; + int64_t nRow = imem->nRow; + int64_t nDel = imem->nDel; if (!nRow && !nDel) { taosThreadRwlockWrlock(&tsdb->rwLock); tsdb->imem = NULL; taosThreadRwlockUnlock(&tsdb->rwLock); - tsdbUnrefMemTable(memt, NULL, true); + tsdbUnrefMemTable(imem, NULL, true); } else { SCommitter2 committer[1]; @@ -430,10 +454,7 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { while (committer->ctx->nextKey != TSKEY_MAX) { code = tsdbCommitFileSet(committer); - if (code) { - lino = __LINE__; - break; - } + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbCloseCommitter(committer, code); @@ -449,33 +470,32 @@ _exit: return code; } -int32_t tsdbCommitCommit(STsdb *pTsdb) { +int32_t tsdbCommitCommit(STsdb *tsdb) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(pTsdb->pVnode); - if (pTsdb->imem == NULL) goto _exit; + if (tsdb->imem == NULL) goto _exit; - SMemTable *pMemTable = pTsdb->imem; - taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSEditCommit(pTsdb->pFS); + SMemTable *pMemTable = tsdb->imem; + taosThreadRwlockWrlock(&tsdb->rwLock); + code = tsdbFSEditCommit(tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadRwlockUnlock(&tsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } - pTsdb->imem = NULL; - taosThreadRwlockUnlock(&pTsdb->rwLock); + tsdb->imem = NULL; + taosThreadRwlockUnlock(&tsdb->rwLock); tsdbUnrefMemTable(pMemTable, NULL, true); // TODO: make this call async - code = tsdbMerge(pTsdb); + code = tsdbMerge(tsdb); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { - tsdbInfo("vgId:%d %s done", vid, __func__); + tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index f9663398dc..aa86a19878 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -538,9 +538,11 @@ int32_t tsdbCloseFS(STFileSystem **ppFS) { return 0; } -int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) { - eid[0] = ++pFS->neid; // TODO: use atomic operation - return 0; +int64_t tsdbFSAllocEid(STFileSystem *fs) { + taosThreadRwlockRdlock(&fs->tsdb->rwLock); + int64_t cid = ++fs->neid; + taosThreadRwlockUnlock(&fs->tsdb->rwLock); + return cid; } int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index 3a1cf659bb..9bd190b83b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -267,7 +267,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { if (code) return code; if (fobj->f->type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f->stt->level); + SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, fobj->f->stt->level); if (!lvl) { code = tsdbSttLvlInit(fobj->f->stt->level, &lvl); if (code) return code; @@ -285,7 +285,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { } else if (op->optype == TSDB_FOP_REMOVE) { // delete a file if (op->of.type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level); + SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, op->of.stt->level); ASSERT(lvl); STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}; @@ -305,7 +305,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { } } else { if (op->nf.type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level); + SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, op->of.stt->level); ASSERT(lvl); STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj; @@ -463,9 +463,9 @@ int32_t tsdbTFileSetRemove(STFileSet **fset) { return 0; } -SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level) { - SSttLvl tlvl = {.level = level}; - SSttLvl *lvl = &tlvl; +SSttLvl *tsdbTFileSetGetSttLvl(STFileSet *fset, int32_t level) { + SSttLvl sttLvl = {.level = level}; + SSttLvl *lvl = &sttLvl; return TARRAY2_SEARCH_EX(fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c index 0ce006df72..2c578fe7af 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -348,7 +348,7 @@ struct SIterMerger { SRBTree iterTree[1]; }; -int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger) { +int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger) { STsdbIter *iter; SRBTreeNode *node; @@ -365,7 +365,7 @@ int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger return tsdbIterMergerNext(merger[0]); } -int32_t tsdbIterMergerClear(SIterMerger **merger) { +int32_t tsdbIterMergerClose(SIterMerger **merger) { if (merger[0]) { taosMemoryFree(merger[0]); merger[0] = NULL; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 3310502359..73dd5465fc 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -57,7 +57,7 @@ static int32_t tsdbMergerOpen(SMerger *merger) { merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize; merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression; merger->compactVersion = INT64_MAX; - tsdbFSAllocEid(merger->tsdb->pFS, &merger->cid); + merger->cid = tsdbFSAllocEid(merger->tsdb->pFS); merger->ctx->opened = true; return 0; } @@ -273,7 +273,7 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) { } } - code = tsdbIterMergerInit(merger->iterArr, &merger->iterMerger); + code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -444,7 +444,7 @@ _exit: } static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) { - tsdbIterMergerClear(&merger->iterMerger); + tsdbIterMergerClose(&merger->iterMerger); TARRAY2_CLEAR(merger->iterArr, tsdbIterClose); return 0; } @@ -566,5 +566,5 @@ _exit: } else if (merger->ctx->opened) { tsdbDebug("vgId:%d %s done", vid, __func__); } - return 0; + return code; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 8b770ed5e9..5251e79df1 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -28,6 +28,7 @@ struct SSttFileReader { SSttFileReaderConfig config[1]; TSttSegReaderArray readerArray[1]; STsdbFD *fd; + uint8_t *bufArr[5]; }; struct SSttSegReader { @@ -87,6 +88,7 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; reader[0]->config[0] = config[0]; + if (!reader[0]->config->bufArr) reader[0]->config->bufArr = reader[0]->bufArr; // open file code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); @@ -118,6 +120,9 @@ _exit: int32_t tsdbSttFileReaderClose(SSttFileReader **reader) { if (reader[0]) { + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) { + tFree(reader[0]->bufArr[i]); + } tsdbCloseFile(&reader[0]->fd); TARRAY2_CLEAR_FREE(reader[0]->readerArray, tsdbSttSegReaderClose); taosMemoryFree(reader[0]);