fix: set commit table when move blk idx between different last file

This commit is contained in:
Cary Xu 2022-05-12 13:30:05 +08:00
parent 17fe1d44da
commit 77f4c3b5ab
1 changed files with 36 additions and 6 deletions

View File

@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith); static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
@ -890,6 +891,8 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
SReadH *pReadh = &pCommith->readh; SReadH *pReadh = &pCommith->readh;
STsdb *pTsdb = TSDB_READ_REPO(pReadh);
STSchema *pTSchema = NULL;
int nBlocks = pIdx->numOfBlocks; int nBlocks = pIdx->numOfBlocks;
int bidx = 0; int bidx = 0;
@ -901,30 +904,49 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return -1; return -1;
} }
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
pCommith->pTable = &table;
while (bidx < nBlocks) { while (bidx < nBlocks) {
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
// Set commit table
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version
if (!pTSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
table.pSchema = pTSchema;
if (tsdbSetCommitTable(pCommith, &table) < 0) {
taosMemoryFreeClear(pTSchema);
return -1;
}
}
if (tsdbMoveBlock(pCommith, bidx) < 0) { if (tsdbMoveBlock(pCommith, bidx) < 0) {
tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
taosMemoryFreeClear(pTSchema);
return -1; return -1;
} }
++bidx; ++bidx;
} }
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
TSDB_COMMIT_TABLE(pCommith) = &table;
if (tsdbWriteBlockInfo(pCommith) < 0) { if (tsdbWriteBlockInfo(pCommith) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
taosMemoryFreeClear(pTSchema);
return -1; return -1;
} }
taosMemoryFreeClear(pTSchema);
return 0; return 0;
} }
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable; pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
@ -1321,6 +1343,14 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
return 0; return 0;
} }
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
if (pBlock->last) {
return pCommith->isLFileSame;
}
return pCommith->isDFileSame;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pDFile; SDFile *pDFile;