Merge pull request #25958 from taosdata/fix/TD-30317-3.0
fix: compact and commit concurrency problem
This commit is contained in:
commit
f55321cebc
|
@ -560,20 +560,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
|||
code = tsdbCommitInfoInit(tsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
SFileSetCommitInfo info = {
|
||||
.fid = fset->fid,
|
||||
.hasDataToCommit = false,
|
||||
.fset = fset,
|
||||
};
|
||||
if ((code = tsdbCommitInfoAdd(tsdb, &info))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
// scan time-series data
|
||||
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
|
||||
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
|
||||
STbData *pTbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
|
@ -608,24 +595,68 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
|||
|
||||
from.key.ts = maxKey + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// scan tomb data
|
||||
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
||||
for (int32_t i = taosArrayGetSize(tsdb->commitInfo->arr) - 1; i >= 0; i--) {
|
||||
int64_t minKey, maxKey;
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
tsdbFidKeyRange(info->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||
// copy existing file set
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
tsdbFSGetFSet(tsdb->pFS, info->fid, &fset);
|
||||
if (fset) {
|
||||
tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
|
||||
}
|
||||
}
|
||||
|
||||
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
|
||||
continue;
|
||||
} else if (!info->hasDataToCommit) {
|
||||
info->hasDataToCommit = true;
|
||||
// scan tomb data
|
||||
if (tsdb->imem->nDel > 0) {
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
SFileSetCommitInfo *info;
|
||||
SFileSetCommitInfo tinfo = {
|
||||
.fid = fset->fid,
|
||||
};
|
||||
|
||||
// check if the file set already on the commit list
|
||||
vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
|
||||
if (info != NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int64_t minKey, maxKey;
|
||||
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
|
||||
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
|
||||
STbData *pTbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
||||
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
|
||||
continue;
|
||||
} else {
|
||||
tinfo.fid = fset->fid;
|
||||
tinfo.hasDataToCommit = true;
|
||||
tinfo.fset = fset;
|
||||
if ((code = tsdbCommitInfoAdd(tsdb, &tinfo))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (tinfo.hasDataToCommit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// begin tasks on file set
|
||||
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbCommitInfoDestroy(tsdb);
|
||||
|
@ -653,16 +684,6 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
|||
code = tsdbCommitInfoBuild(tsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
STFileSet *fset;
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
|
||||
if (info->hasDataToCommit && info->fset) {
|
||||
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
|
|
|
@ -1181,7 +1181,7 @@ int32_t tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
|
|||
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
|
||||
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, fset);
|
||||
if (sttTrigger == 1 && fset) {
|
||||
if (sttTrigger == 1 && (*fset)) {
|
||||
for (;;) {
|
||||
if ((*fset)->taskRunning) {
|
||||
(*fset)->numWaitTask++;
|
||||
|
|
Loading…
Reference in New Issue