fix(tsdb/cos-merge): skip stt merging & not block commit
This commit is contained in:
parent
2719420be0
commit
65a27dd9c8
|
@ -13,6 +13,6 @@ ExternalProject_Add(xml2
|
||||||
BUILD_IN_SOURCE TRUE
|
BUILD_IN_SOURCE TRUE
|
||||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --enable-shared=no --enable-static=yes --without-python --without-lzma
|
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --enable-shared=no --enable-static=yes --without-python --without-lzma
|
||||||
BUILD_COMMAND make -j
|
BUILD_COMMAND make -j
|
||||||
INSTALL_COMMAND make install && ln -s $ENV{HOME}/.cos-local.2/include/libxml2/libxml $ENV{HOME}/.cos-local.2/include/libxml
|
INSTALL_COMMAND make install && ln -sf $ENV{HOME}/.cos-local.2/include/libxml2/libxml $ENV{HOME}/.cos-local.2/include/libxml
|
||||||
TEST_COMMAND ""
|
TEST_COMMAND ""
|
||||||
)
|
)
|
||||||
|
|
|
@ -879,13 +879,11 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool skipMerge = false;
|
||||||
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
|
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
|
||||||
if (numFile >= sttTrigger) {
|
if (numFile >= sttTrigger) {
|
||||||
// launch merge
|
// launch merge
|
||||||
bool skipMerge = false;
|
|
||||||
{
|
{
|
||||||
int32_t now = taosGetTimestampSec();
|
|
||||||
|
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
extern int32_t tsS3UploadDelaySec;
|
extern int32_t tsS3UploadDelaySec;
|
||||||
long s3Size(const char *object_name);
|
long s3Size(const char *object_name);
|
||||||
|
@ -897,6 +895,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
||||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||||
|
|
||||||
if (taosCheckExistFile(fobj->fname)) {
|
if (taosCheckExistFile(fobj->fname)) {
|
||||||
|
int32_t now = taosGetTimestampSec();
|
||||||
int32_t mtime = 0;
|
int32_t mtime = 0;
|
||||||
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
if (mtime < now - tsS3UploadDelaySec) {
|
if (mtime < now - tsS3UploadDelaySec) {
|
||||||
|
@ -916,7 +915,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
|
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR && !skipMerge) {
|
||||||
tsdbFSSetBlockCommit(fset, true);
|
tsdbFSSetBlockCommit(fset, true);
|
||||||
} else {
|
} else {
|
||||||
tsdbFSSetBlockCommit(fset, false);
|
tsdbFSSetBlockCommit(fset, false);
|
||||||
|
|
|
@ -548,6 +548,36 @@ static int32_t tsdbMerge(void *arg) {
|
||||||
|
|
||||||
if (merger->fset == NULL) return 0;
|
if (merger->fset == NULL) return 0;
|
||||||
|
|
||||||
|
bool skipMerge = false;
|
||||||
|
{
|
||||||
|
extern int8_t tsS3Enabled;
|
||||||
|
extern int32_t tsS3UploadDelaySec;
|
||||||
|
long s3Size(const char *object_name);
|
||||||
|
int32_t nlevel = tfsGetLevel(merger->tsdb->pVnode->pTfs);
|
||||||
|
if (tsS3Enabled && nlevel > 1) {
|
||||||
|
STFileObj *fobj = merger->fset->farr[TSDB_FTYPE_DATA];
|
||||||
|
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||||
|
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||||
|
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||||
|
|
||||||
|
if (taosCheckExistFile(fobj->fname)) {
|
||||||
|
int32_t now = taosGetTimestampSec();
|
||||||
|
int32_t mtime = 0;
|
||||||
|
|
||||||
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
|
if (mtime < now - tsS3UploadDelaySec) {
|
||||||
|
skipMerge = true;
|
||||||
|
}
|
||||||
|
} else if (s3Size(object_name) > 0) {
|
||||||
|
skipMerge = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// new fset can be written with ts data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipMerge) return 0;
|
||||||
|
|
||||||
// do merge
|
// do merge
|
||||||
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
|
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
|
||||||
code = tsdbDoMerge(merger);
|
code = tsdbDoMerge(merger);
|
||||||
|
@ -578,4 +608,4 @@ int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) {
|
||||||
if (code) taosMemoryFree(arg);
|
if (code) taosMemoryFree(arg);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue