Merge remote-tracking branch 'origin/enh/tsdb_optimize' into enh/tsdb_optimize

This commit is contained in:
Haojun Liao 2023-06-29 15:09:14 +08:00
commit 63834713b2
8 changed files with 19 additions and 5 deletions

View File

@ -329,6 +329,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
&committer->ctx->maxKey); &committer->ctx->maxKey);
code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did); code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
STFileSet fset = {.fid = committer->ctx->fid}; STFileSet fset = {.fid = committer->ctx->fid};
committer->ctx->fset = &fset; committer->ctx->fset = &fset;
STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ); STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);

View File

@ -511,6 +511,9 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
} }
_exit: _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
}
return code; return code;
} }
@ -597,7 +600,6 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
} }
tsem_wait(&fs->canEdit); tsem_wait(&fs->canEdit);
fs->etype = etype; fs->etype = etype;
// edit // edit
@ -646,6 +648,7 @@ _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
} else { } else {
tsdbDebug("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
tsem_post(&fs->canEdit); tsem_post(&fs->canEdit);
} }
return code; return code;

View File

@ -305,7 +305,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
STFileObj *tfobjp = &tfobj; STFileObj *tfobjp = &tfobj;
int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
ASSERT(idx >= 0); ASSERT(idx >= 0);
TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlRemoveFObj); TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) { if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
// TODO: remove the stt level if no file exists anymore // TODO: remove the stt level if no file exists anymore

View File

@ -203,8 +203,9 @@ int32_t tsdbTFileObjRef(STFileObj *fobj) {
int32_t nRef; int32_t nRef;
taosThreadMutexLock(&fobj->mutex); taosThreadMutexLock(&fobj->mutex);
ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE); ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE);
nRef = fobj->ref++; nRef = ++fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); taosThreadMutexUnlock(&fobj->mutex);
tsdbTrace("ref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
return 0; return 0;
} }
@ -213,6 +214,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
int32_t nRef = --fobj->ref; int32_t nRef = --fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); taosThreadMutexUnlock(&fobj->mutex);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
if (nRef == 0) { if (nRef == 0) {
if (fobj->state == TSDB_FSTATE_DEAD) { if (fobj->state == TSDB_FSTATE_DEAD) {
remove_file(fobj->fname); remove_file(fobj->fname);
@ -229,6 +231,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) {
fobj->state = TSDB_FSTATE_DEAD; fobj->state = TSDB_FSTATE_DEAD;
int32_t nRef = --fobj->ref; int32_t nRef = --fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); taosThreadMutexUnlock(&fobj->mutex);
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
if (nRef == 0) { if (nRef == 0) {
remove_file(fobj->fname); remove_file(fobj->fname);
taosMemoryFree(fobj); taosMemoryFree(fobj);

View File

@ -215,6 +215,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
code = TSDB_CODE_FS_NO_VALID_DISK; code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tfsMkdirRecurAt(merger->tsdb->pVnode->pTfs, merger->tsdb->path, did);
SFSetWriterConfig config = { SFSetWriterConfig config = {
.tsdb = merger->tsdb, .tsdb = merger->tsdb,
.toSttOnly = true, .toSttOnly = true,
@ -383,7 +384,7 @@ _exit:
} else { } else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid); tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
} }
return 0; return code;
} }
static int32_t tsdbDoMerge(SMerger *merger) { static int32_t tsdbDoMerge(SMerger *merger) {

View File

@ -220,6 +220,7 @@ static int32_t tsdbDoRetention2(void *arg) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
// data // data
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {

View File

@ -772,6 +772,7 @@ static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
code = TSDB_CODE_NO_AVAIL_DISK; code = TSDB_CODE_NO_AVAIL_DISK;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
writer->ctx->hasData = true; writer->ctx->hasData = true;
writer->ctx->hasTomb = true; writer->ctx->hasTomb = true;

View File

@ -147,7 +147,11 @@ static void* loop(void* arg) {
SVnodeTask* pTask; SVnodeTask* pTask;
int ret; int ret;
setThreadName("vnode-commit"); if (tp == &vnodeGlobal.tp[0]) {
setThreadName("vnode-commit");
} else if (tp == &vnodeGlobal.tp[1]) {
setThreadName("vnode-merge");
}
for (;;) { for (;;) {
taosThreadMutexLock(&(tp->mutex)); taosThreadMutexLock(&(tp->mutex));