This commit is contained in:
Hongze Cheng 2023-01-03 11:57:25 +08:00
parent 43bc021eb0
commit 9fd2ac0b4c
1 changed files with 52 additions and 55 deletions

View File

@ -76,6 +76,7 @@ static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pN
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SDFileSet dFileSet = {.fid = pReader->fid}; SDFileSet dFileSet = {.fid = pReader->fid};
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
@ -83,7 +84,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
pReader->fid = pSet->fid; pReader->fid = pSet->fid;
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pReader->pIter = NULL; pReader->pIter = NULL;
tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn); tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn);
@ -93,13 +94,13 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
pIter->type = SNAP_DATA_FILE_ITER; pIter->type = SNAP_DATA_FILE_ITER;
code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx); code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) { for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
SDataBlk dataBlk; SDataBlk dataBlk;
@ -108,7 +109,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid); ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid); ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
@ -139,7 +140,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
pIter->iStt = iStt; pIter->iStt = iStt;
code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk); code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
@ -148,7 +149,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
if (pSttBlk->maxVer < pReader->sver) continue; if (pSttBlk->maxVer < pReader->sver) continue;
code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
@ -169,13 +170,13 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
pIter++; pIter++;
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode), _exit:
pReader->pTsdb->path, pReader->fid); if (code) {
return code; tsdbError("vgId:%d, %s failed since %s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code));
} else {
_err: tsdbInfo("vgId:%d, %s done, path:%s, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path,
tsdbError("vgId:%d, vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode), pReader->fid);
tstrerror(code)); }
return code; return code;
} }
@ -318,12 +319,14 @@ _exit:
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb; int32_t lino = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) { while (true) {
if (pReader->pDataFReader == NULL) { if (pReader->pDataFReader == NULL) {
code = tsdbSnapReadOpenFile(pReader); code = tsdbSnapReadOpenFile(pReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (pReader->pDataFReader == NULL) break; if (pReader->pDataFReader == NULL) break;
@ -338,17 +341,17 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
SBlockData* pBlockData = &pReader->bData; SBlockData* pBlockData = &pReader->bData;
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0); code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid); code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSnapNextRow(pReader); code = tsdbSnapNextRow(pReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbSnapGetRow(pReader); pRowInfo = tsdbSnapGetRow(pReader);
if (pRowInfo == NULL) { if (pRowInfo == NULL) {
@ -360,21 +363,22 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
code = tsdbSnapCmprData(pReader, ppData); code = tsdbSnapCmprData(pReader, ppData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
break; break;
} }
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
tsdbError("vgId:%d, vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, }
tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb* pTsdb = pReader->pTsdb; STsdb* pTsdb = pReader->pTsdb;
SDelFile* pDelFile = pReader->fs.pDelFile; SDelFile* pDelFile = pReader->fs.pDelFile;
@ -385,11 +389,11 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
// open // open
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb); code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// read index // read index
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx); code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pReader->iDelIdx = 0; pReader->iDelIdx = 0;
} }
@ -405,7 +409,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iDelIdx++; pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData); code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0; int32_t size = 0;
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
@ -422,7 +426,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) { if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
@ -449,11 +453,9 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
_err: }
tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -591,44 +593,39 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
*ppData = NULL; *ppData = NULL;
// read data file // read data file
if (!pReader->dataDone) { if (!pReader->dataDone) {
code = tsdbSnapReadData(pReader, ppData); code = tsdbSnapReadData(pReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->dataDone = 1;
goto _exit;
} else {
pReader->dataDone = 1;
}
} }
} }
// read del file // read del file
if (!pReader->delDone) { if (!pReader->delDone) {
code = tsdbSnapReadDel(pReader, ppData); code = tsdbSnapReadDel(pReader, ppData);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err; if (*ppData) {
goto _exit;
} else { } else {
if (*ppData) { pReader->delDone = 1;
goto _exit;
} else {
pReader->delDone = 1;
}
} }
} }
_exit: _exit:
tsdbDebug("vgId:%d, vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); if (code) {
return code; tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code),
pReader->pTsdb->path);
_err: } else {
tsdbError("vgId:%d, vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode), tsdbDebug("vgId:%d, %s done, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path);
pReader->pTsdb->path, tstrerror(code)); }
return code; return code;
} }