more code
This commit is contained in:
parent
f3e199dda2
commit
ded3d6e7c9
|
@ -26,7 +26,9 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData,
|
|||
#define TSDB_STT_FILE_DATA_ITER 2
|
||||
#define TSDB_TOMB_FILE_DATA_ITER 3
|
||||
|
||||
typedef struct STsdbDataIter2 STsdbDataIter2;
|
||||
typedef struct STsdbDataIter2 STsdbDataIter2;
|
||||
typedef struct STsdbFilterInfo STsdbFilterInfo;
|
||||
|
||||
struct STsdbDataIter2 {
|
||||
STsdbDataIter2* next;
|
||||
SRBTreeNode rbtn;
|
||||
|
@ -70,6 +72,13 @@ struct STsdbDataIter2 {
|
|||
};
|
||||
};
|
||||
|
||||
#define TSDB_FILTER_FLAG_BY_VERSION 0x1
|
||||
struct STsdbFilterInfo {
|
||||
int32_t flag;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
};
|
||||
|
||||
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn)))
|
||||
|
||||
/* open */
|
||||
|
@ -251,24 +260,44 @@ static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode*
|
|||
/* seek */
|
||||
|
||||
/* iter next */
|
||||
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter) {
|
||||
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
for (;;) {
|
||||
if (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
|
||||
while (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
|
||||
if (pFilterInfo) {
|
||||
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
|
||||
if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver ||
|
||||
pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) {
|
||||
pIter->dIter.iRow++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pIter->rowInfo.suid = pIter->dIter.bData.suid;
|
||||
pIter->rowInfo.uid = pIter->dIter.bData.uid;
|
||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
|
||||
pIter->dIter.iRow++;
|
||||
break;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
|
||||
while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
|
||||
|
||||
// filter
|
||||
if (pFilterInfo) {
|
||||
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
|
||||
if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) {
|
||||
pIter->dIter.iDataBlk++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
|
@ -278,6 +307,8 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break;
|
||||
|
||||
for (;;) {
|
||||
if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
|
||||
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
|
||||
|
@ -304,31 +335,52 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter) {
|
||||
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
for (;;) {
|
||||
if (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
|
||||
while (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
|
||||
if (pFilterInfo) {
|
||||
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
|
||||
if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] ||
|
||||
pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) {
|
||||
pIter->sIter.iRow++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pIter->rowInfo.suid = pIter->sIter.bData.suid;
|
||||
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
|
||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
|
||||
pIter->sIter.iRow++;
|
||||
break;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
|
||||
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
|
||||
for (;;) {
|
||||
if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
|
||||
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
|
||||
|
||||
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (pFilterInfo) {
|
||||
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
|
||||
if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) {
|
||||
pIter->sIter.iSttBlk++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pIter->sIter.iSttBlk++;
|
||||
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pIter->sIter.iRow = 0;
|
||||
} else {
|
||||
pIter->rowInfo = (SRowInfo){0};
|
||||
break;
|
||||
pIter->sIter.iRow = 0;
|
||||
pIter->sIter.iSttBlk++;
|
||||
break;
|
||||
} else {
|
||||
pIter->rowInfo = (SRowInfo){0};
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,16 +391,16 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter) {
|
||||
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
|
||||
ASSERT(0);
|
||||
return code;
|
||||
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
|
||||
return tsdbDataFileDataIterNext(pIter);
|
||||
return tsdbDataFileDataIterNext(pIter, pFilterInfo);
|
||||
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
|
||||
return tsdbSttFileDataIterNext(pIter);
|
||||
return tsdbSttFileDataIterNext(pIter, pFilterInfo);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return code;
|
||||
|
@ -366,6 +418,7 @@ struct STsdbSnapReader {
|
|||
uint8_t* aBuf[5];
|
||||
|
||||
STsdbFS fs;
|
||||
TABLEID tbid;
|
||||
SSkmInfo skmTable;
|
||||
|
||||
// timeseries data
|
||||
|
@ -382,224 +435,151 @@ struct STsdbSnapReader {
|
|||
int8_t delDone;
|
||||
SDelFReader* pDelFReader;
|
||||
STsdbDataIter2* pTIter;
|
||||
SArray* aDelData;
|
||||
};
|
||||
|
||||
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
|
||||
SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n));
|
||||
SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n));
|
||||
|
||||
return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo);
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
||||
static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SDFileSet dFileSet = {.fid = pReader->fid};
|
||||
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
|
||||
if (pSet == NULL) return code;
|
||||
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
|
||||
if (pSet == NULL) {
|
||||
pReader->fid = INT32_MAX;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pReader->fid = pSet->fid;
|
||||
|
||||
tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn);
|
||||
|
||||
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pReader->pIter = NULL;
|
||||
tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn);
|
||||
|
||||
// .data file
|
||||
SFDataIter* pIter = &pReader->aFDataIter[0];
|
||||
pIter->type = SNAP_DATA_FILE_ITER;
|
||||
|
||||
code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
|
||||
code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
|
||||
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (pReader->pIter) {
|
||||
// iter to next with filter info (sver, ever)
|
||||
code = tsdbDataIterNext2(pReader->pIter,
|
||||
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag
|
||||
.sver = pReader->sver,
|
||||
.ever = pReader->ever});
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
|
||||
if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
|
||||
// add to rbtree
|
||||
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
|
||||
|
||||
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
|
||||
ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
|
||||
|
||||
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->pBlockIdx->suid;
|
||||
pIter->rInfo.uid = pIter->pBlockIdx->uid;
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _add_iter_and_break;
|
||||
}
|
||||
}
|
||||
// add to iterList
|
||||
pReader->pIter->next = pReader->iterList;
|
||||
pReader->iterList = pReader->pIter;
|
||||
} else {
|
||||
tsdbCloseDataIter2(pReader->pIter);
|
||||
}
|
||||
|
||||
continue;
|
||||
|
||||
_add_iter_and_break:
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
|
||||
break;
|
||||
}
|
||||
|
||||
// .stt file
|
||||
pIter = &pReader->aFDataIter[1];
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
pIter->type = SNAP_STT_FILE_ITER;
|
||||
pIter->iStt = iStt;
|
||||
|
||||
code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
|
||||
code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
|
||||
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
||||
if (pSttBlk->minVer > pReader->ever) continue;
|
||||
if (pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
|
||||
if (pReader->pIter) {
|
||||
// iter to valid row
|
||||
code = tsdbDataIterNext2(pReader->pIter,
|
||||
&(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag
|
||||
.sver = pReader->sver,
|
||||
.ever = pReader->ever});
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
|
||||
// add to rbtree
|
||||
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->bData.suid;
|
||||
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _add_iter;
|
||||
}
|
||||
// add to iterList
|
||||
pReader->pIter->next = pReader->iterList;
|
||||
pReader->iterList = pReader->pIter;
|
||||
} else {
|
||||
tsdbCloseDataIter2(pReader->pIter);
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
|
||||
_add_iter:
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
|
||||
pIter++;
|
||||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed since %s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code));
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d, %s done, path:%s, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path,
|
||||
pReader->fid);
|
||||
tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
||||
static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) {
|
||||
while (pReader->iterList) {
|
||||
STsdbDataIter2* pIter = pReader->iterList;
|
||||
pReader->iterList = pIter->next;
|
||||
tsdbCloseDataIter2(pIter);
|
||||
}
|
||||
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pReader->pIter) {
|
||||
SFDataIter* pIter = NULL;
|
||||
while (true) {
|
||||
_find_row:
|
||||
pIter = pReader->pIter;
|
||||
for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag
|
||||
.sver = pReader->sver,
|
||||
.ever = pReader->ever});
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->bData.suid;
|
||||
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _out;
|
||||
if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) {
|
||||
pReader->pIter = NULL;
|
||||
} else {
|
||||
SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
|
||||
if (pNode) {
|
||||
int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode);
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
|
||||
pReader->pIter = NULL;
|
||||
} else if (c == 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->type == SNAP_DATA_FILE_ITER) {
|
||||
while (true) {
|
||||
for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
|
||||
SDataBlk dataBlk;
|
||||
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
|
||||
|
||||
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = -1;
|
||||
goto _find_row;
|
||||
}
|
||||
|
||||
pIter->iBlockIdx++;
|
||||
if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;
|
||||
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (code) goto _err;
|
||||
pIter->iBlock = -1;
|
||||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
break;
|
||||
} else if (pIter->type == SNAP_STT_FILE_ITER) {
|
||||
for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
|
||||
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
||||
if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = -1;
|
||||
goto _find_row;
|
||||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
break;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
_out:
|
||||
pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
|
||||
if (pReader->pIter && pIter) {
|
||||
int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
|
||||
pReader->pIter = NULL;
|
||||
} else {
|
||||
ASSERT(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pReader->pIter == NULL) {
|
||||
pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
|
||||
if (pReader->pIter) {
|
||||
tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
|
||||
SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
|
||||
if (pNode) {
|
||||
tRBTreeDrop(&pReader->rbt, pNode);
|
||||
pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
if (ppRowInfo) {
|
||||
if (pReader->pIter) {
|
||||
*ppRowInfo = &pReader->pIter->rowInfo;
|
||||
} else {
|
||||
*ppRowInfo = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_err:
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) {
|
||||
static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
|
||||
if (pReader->pIter) {
|
||||
return &pReader->pIter->rInfo;
|
||||
} else {
|
||||
tsdbSnapNextRow(pReader);
|
||||
|
||||
if (pReader->pIter) {
|
||||
return &pReader->pIter->rInfo;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
*ppRowInfo = &pReader->pIter->rowInfo;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return tsdbSnapReadNextRow(pReader, ppRowInfo);
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
|
@ -641,138 +621,190 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** pp
|
|||
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
|
||||
while (true) {
|
||||
tBlockDataClear(&pReader->bData);
|
||||
|
||||
for (;;) {
|
||||
// start a new file read if need
|
||||
if (pReader->pDataFReader == NULL) {
|
||||
code = tsdbSnapReadOpenFile(pReader);
|
||||
code = tsdbSnapReadFileDataStart(pReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (pReader->pDataFReader == NULL) break;
|
||||
|
||||
SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
|
||||
SRowInfo* pRowInfo;
|
||||
code = tsdbSnapReadGetRow(pReader, &pRowInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pRowInfo == NULL) {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
tsdbSnapReadFileDataEnd(pReader);
|
||||
continue;
|
||||
}
|
||||
|
||||
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
|
||||
SBlockData* pBlockData = &pReader->bData;
|
||||
|
||||
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
|
||||
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
|
||||
code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
|
||||
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
do {
|
||||
if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break;
|
||||
|
||||
code = tsdbSnapNextRow(pReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) {
|
||||
code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pRowInfo = tsdbSnapGetRow(pReader);
|
||||
if (pRowInfo == NULL) {
|
||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||
break;
|
||||
for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) {
|
||||
pReader->bData.aUid[iRow] = pReader->bData.uid;
|
||||
}
|
||||
pReader->bData.uid = 0;
|
||||
}
|
||||
|
||||
if (pBlockData->nRow >= 4096) break;
|
||||
}
|
||||
code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapCmprData(pReader, ppData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
code = tsdbSnapReadNextRow(pReader, &pRowInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pReader->bData.nRow >= 4096) break;
|
||||
} while (pRowInfo);
|
||||
|
||||
ASSERT(pReader->bData.nRow > 0);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (pReader->bData.nRow > 0) {
|
||||
code = tsdbSnapCmprData(pReader, ppData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadTombstoneData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
SDelFile* pDelFile = pReader->fs.pDelFile;
|
||||
|
||||
if (pReader->pDelFReader == NULL) {
|
||||
if (pDelFile == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// open
|
||||
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// read index
|
||||
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pReader->iDelIdx = 0;
|
||||
int64_t size = sizeof(TABLEID);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
|
||||
size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData));
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
|
||||
tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||
break;
|
||||
}
|
||||
|
||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
|
||||
|
||||
pReader->iDelIdx++;
|
||||
|
||||
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
|
||||
uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
if (pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int32_t size = 0;
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
|
||||
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
pHdr->type = SNAP_DATA_DEL;
|
||||
pHdr->size = size;
|
||||
|
||||
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
|
||||
size += tPutDelData(NULL, pDelData);
|
||||
}
|
||||
}
|
||||
if (size == 0) continue;
|
||||
TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr));
|
||||
*pId = pReader->tbid;
|
||||
|
||||
// org data
|
||||
size = sizeof(TABLEID) + size;
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = SNAP_DATA_DEL;
|
||||
pHdr->size = size;
|
||||
|
||||
TABLEID* pId = (TABLEID*)(&pHdr[1]);
|
||||
pId->suid = pDelIdx->suid;
|
||||
pId->uid = pDelIdx->uid;
|
||||
int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
|
||||
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
|
||||
|
||||
if (pDelData->version < pReader->sver) continue;
|
||||
if (pDelData->version > pReader->ever) continue;
|
||||
|
||||
n += tPutDelData((*ppData) + n, pDelData);
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d",
|
||||
TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
|
||||
|
||||
break;
|
||||
size = sizeof(SSnapDataHdr) + sizeof(TABLEID);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
|
||||
size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData));
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
if (pData) {
|
||||
taosMemoryFree(pData);
|
||||
pData = NULL;
|
||||
}
|
||||
}
|
||||
*ppData = pData;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadGetTombData(STsdbSnapReader* pReader, void* pDelInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, void* pDelInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
// TODO
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
|
||||
if (pReader->pDelFReader == NULL) {
|
||||
if (pReader->fs.pDelFile == NULL) goto _exit;
|
||||
|
||||
// open
|
||||
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
struct {
|
||||
int64_t suid;
|
||||
int64_t uid;
|
||||
SDelData dData;
|
||||
}* pDelInfo;
|
||||
code = tsdbSnapReadGetTombData(pReader, &pDelInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pDelInfo == NULL) goto _exit;
|
||||
|
||||
pReader->tbid = *(TABLEID*)pDelInfo;
|
||||
|
||||
if (pReader->aDelData) {
|
||||
taosArrayClear(pReader->aDelData);
|
||||
} else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
|
||||
if (taosArrayPush(pReader->aDelData, &pDelInfo->dData) < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbSnapReadNextTombData(pReader, &pDelInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pReader->aDelData) > 0) {
|
||||
code = tsdbSnapCmprTombData(pReader, ppData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -887,7 +919,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
|
||||
// read del file
|
||||
if (!pReader->delDone) {
|
||||
code = tsdbSnapReadTombstoneData(pReader, ppData);
|
||||
code = tsdbSnapReadTombData(pReader, ppData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
|
@ -1218,7 +1250,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pWriter->pSIter) {
|
||||
code = tsdbSttFileDataIterNext(pWriter->pSIter);
|
||||
code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// add to tree
|
||||
|
@ -1312,7 +1344,7 @@ static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowIn
|
|||
int32_t lino = 0;
|
||||
|
||||
if (pWriter->pSIter) {
|
||||
code = tsdbDataIterNext2(pWriter->pSIter);
|
||||
code = tsdbDataIterNext2(pWriter->pSIter, NULL);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) {
|
||||
|
|
Loading…
Reference in New Issue