Merge pull request #12331 from taosdata/feature/TD-14481-3.0
fix: commit table in mem and file
This commit is contained in:
commit
61354ecaac
|
@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
|
|||
static void tsdbResetCommitFile(SCommitH *pCommith);
|
||||
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
|
||||
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
|
||||
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
|
||||
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
|
||||
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
|
||||
static int tsdbWriteBlockInfo(SCommitH *pCommih);
|
||||
|
@ -349,7 +350,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
|||
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// Loop to commit each table data
|
||||
for (int tid = 0; tid < pCommith->niters; tid++) {
|
||||
SCommitIter *pIter = pCommith->iters + tid;
|
||||
|
@ -363,6 +364,46 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
// Loop to commit each table data in mem and file
|
||||
int mIter = 0, fIter = 0;
|
||||
int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx);
|
||||
|
||||
while (true) {
|
||||
SBlockIdx *pIdx = NULL;
|
||||
SCommitIter *pIter = NULL;
|
||||
if (mIter < pCommith->niters) {
|
||||
pIter = pCommith->iters + mIter;
|
||||
if (fIter < nBlkIdx) {
|
||||
pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
|
||||
}
|
||||
} else if (fIter < nBlkIdx) {
|
||||
pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) {
|
||||
if (tsdbCommitToTable(pCommith, mIter) < 0) {
|
||||
tsdbCloseCommitFile(pCommith, true);
|
||||
// revert the file change
|
||||
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pIdx && (pIter->pTable->uid == pIdx->uid)) {
|
||||
++fIter;
|
||||
}
|
||||
++mIter;
|
||||
} else if (pIdx) {
|
||||
if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) {
|
||||
tsdbCloseCommitFile(pCommith, true);
|
||||
// revert the file change
|
||||
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||
return -1;
|
||||
}
|
||||
++fIter;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
|
||||
0) {
|
||||
|
@ -838,6 +879,40 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
|
||||
SReadH *pReadh = &pCommith->readh;
|
||||
int nBlocks = pIdx->numOfBlocks;
|
||||
int bidx = 0;
|
||||
|
||||
tsdbResetCommitTable(pCommith);
|
||||
|
||||
pReadh->pBlkIdx = pIdx;
|
||||
|
||||
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (bidx < nBlocks) {
|
||||
if (tsdbMoveBlock(pCommith, bidx) < 0) {
|
||||
tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
|
||||
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
++bidx;
|
||||
}
|
||||
|
||||
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
|
||||
TSDB_COMMIT_TABLE(pCommith) = &table;
|
||||
|
||||
if (tsdbWriteBlockInfo(pCommith) < 0) {
|
||||
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
|
||||
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
|
||||
|
|
|
@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
|
|||
}
|
||||
|
||||
if (level == TSDB_RETENTION_L0) {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L0);
|
||||
return VND_RSMA0(pVnode);
|
||||
} else if (level == TSDB_RETENTION_L1) {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L1);
|
||||
return VND_RSMA1(pVnode);
|
||||
} else {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L2);
|
||||
return VND_RSMA2(pVnode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1943,7 +1943,6 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
|
|||
|
||||
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
|
||||
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
|
||||
tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue