Merge pull request #25974 from taosdata/fix/TD-30317-3.0

Fix/TD-30317-3.0
This commit is contained in:
Hongze Cheng 2024-05-30 13:52:44 +08:00 committed by GitHub
commit e2349518ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 36 additions and 51 deletions

View File

@ -18,7 +18,6 @@
// extern dependencies // extern dependencies
typedef struct { typedef struct {
int32_t fid; int32_t fid;
bool hasDataToCommit;
STFileSet *fset; STFileSet *fset;
} SFileSetCommitInfo; } SFileSetCommitInfo;
@ -512,36 +511,25 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) { static int32_t tsdbCommitInfoAdd(STsdb *tsdb, int32_t fid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SFileSetCommitInfo *tinfo; SFileSetCommitInfo *tinfo;
vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo); if ((tinfo = taosMemoryMalloc(sizeof(*tinfo))) == NULL) {
if (tinfo) { TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
if (info->hasDataToCommit && !tinfo->hasDataToCommit) {
tinfo->hasDataToCommit = true;
}
} else {
if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
tinfo->fid = info->fid;
tinfo->hasDataToCommit = info->hasDataToCommit;
if (info->fset) {
code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = vHashPut(tsdb->commitInfo->ht, tinfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
} }
tinfo->fid = fid;
tinfo->fset = NULL;
code = vHashPut(tsdb->commitInfo->ht, tinfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
_exit: _exit:
if (code) { if (code) {
@ -585,13 +573,15 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision); fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision);
tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
SFileSetCommitInfo info = { SFileSetCommitInfo *info;
.fid = fid, SFileSetCommitInfo tinfo = {
.hasDataToCommit = true, .fid = fid,
.fset = NULL,
}; };
code = tsdbCommitInfoAdd(tsdb, &info); vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
TSDB_CHECK_CODE(code, lino, _exit); if (info == NULL) {
code = tsdbCommitInfoAdd(tsdb, fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
from.key.ts = maxKey + 1; from.key.ts = maxKey + 1;
} }
@ -599,18 +589,13 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
taosThreadMutexLock(&tsdb->mutex); taosThreadMutexLock(&tsdb->mutex);
// copy existing file set
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbFSGetFSet(tsdb->pFS, info->fid, &fset);
if (fset) {
tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
}
}
// scan tomb data // scan tomb data
if (tsdb->imem->nDel > 0) { if (tsdb->imem->nDel > 0) {
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
if (tsdbTFileSetIsEmpty(fset)) {
continue;
}
SFileSetCommitInfo *info; SFileSetCommitInfo *info;
SFileSetCommitInfo tinfo = { SFileSetCommitInfo tinfo = {
.fid = fset->fid, .fid = fset->fid,
@ -623,6 +608,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} }
int64_t minKey, maxKey; int64_t minKey, maxKey;
bool hasDataToCommit = false;
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1); iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) { for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
@ -631,10 +617,8 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
continue; continue;
} else { } else {
tinfo.fid = fset->fid; hasDataToCommit = true;
tinfo.hasDataToCommit = true; if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) {
tinfo.fset = fset;
if ((code = tsdbCommitInfoAdd(tsdb, &tinfo))) {
taosThreadMutexUnlock(&tsdb->mutex); taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -642,7 +626,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} }
} }
if (tinfo.hasDataToCommit) { if (hasDataToCommit) {
break; break;
} }
} }
@ -653,6 +637,9 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
if (fset) {
tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
}
} }
taosThreadMutexUnlock(&tsdb->mutex); taosThreadMutexUnlock(&tsdb->mutex);
@ -756,10 +743,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (committer.ctx->info->hasDataToCommit) { code = tsdbCommitFileSet(&committer);
code = tsdbCommitFileSet(&committer); TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
code = tsdbCloseCommitter(&committer, code); code = tsdbCloseCommitter(&committer, code);
@ -792,7 +777,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(tsdb, info->fid); tsdbFinishTaskOnFileSet(tsdb, info->fid);
} }
} }
@ -824,7 +809,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
taosThreadMutexLock(&pTsdb->mutex); taosThreadMutexLock(&pTsdb->mutex);
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid); tsdbFinishTaskOnFileSet(pTsdb, info->fid);
} }
} }