more work
This commit is contained in:
parent
032339949e
commit
b7e2b7398e
|
@ -183,7 +183,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapReader ========================================
|
// STsdbSnapReader ========================================
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData, int64_t* nData);
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
||||||
// STsdbSnapWriter ========================================
|
// STsdbSnapWriter ========================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t streamType; // sma or other
|
int8_t streamType; // sma or other
|
||||||
|
|
|
@ -21,21 +21,91 @@ struct STsdbSnapReader {
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
// for data file
|
// for data file
|
||||||
|
int8_t dataDone;
|
||||||
|
int32_t fid;
|
||||||
SDataFReader* pDataFReader;
|
SDataFReader* pDataFReader;
|
||||||
|
int32_t iBlockIdx;
|
||||||
|
SArray* aBlockIdx; // SArray<SBlockIdx>
|
||||||
// for del file
|
// for del file
|
||||||
|
int8_t delDone;
|
||||||
SDelFReader* pDelFReader;
|
SDelFReader* pDelFReader;
|
||||||
|
int32_t iDelIdx;
|
||||||
|
SArray* aDelIdx; // SArray<SDelIdx>
|
||||||
|
SArray* aDelData; // SArray<SDelData>
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct STsdbSnapWriter {
|
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
STsdb* pTsdb;
|
int32_t code = 0;
|
||||||
int64_t sver;
|
|
||||||
int64_t ever;
|
if (pReader->pDataFReader == NULL) {
|
||||||
// for data file
|
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, NULL);
|
||||||
int32_t iDFileSet;
|
if (code) goto _err;
|
||||||
SDataFWriter* pDataFWriter;
|
|
||||||
// for del file
|
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
|
||||||
SDelFWriter* pDelFWriter;
|
if (code) goto _err;
|
||||||
} STsdbSnapWriter;
|
|
||||||
|
pReader->iBlockIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STsdb* pTsdb = pReader->pTsdb;
|
||||||
|
SDelFile* pDelFile = pTsdb->fs->nState->pDelFile;
|
||||||
|
|
||||||
|
if (pReader->pDelFReader == NULL) {
|
||||||
|
if (pDelFile == NULL) {
|
||||||
|
code = TSDB_CODE_VND_READ_END;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open
|
||||||
|
code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
pReader->iDelIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pReader->iDelIdx < taosArrayGetSize(pReader->aDelIdx)) {
|
||||||
|
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);
|
||||||
|
int8_t overlap = 0;
|
||||||
|
|
||||||
|
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
|
||||||
|
SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
|
||||||
|
|
||||||
|
if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
|
||||||
|
// encode the data to sync (todo)
|
||||||
|
overlap = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (overlap) {
|
||||||
|
// prepare the data
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = TSDB_CODE_VND_READ_END;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d snap read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
|
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -60,13 +130,46 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData, int64_t* nData) {
|
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
taosMemoryFree(ppReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// read data file
|
// read data file
|
||||||
|
if (!pReader->dataDone) {
|
||||||
|
code = tsdbSnapReadData(pReader, ppData);
|
||||||
|
if (code) {
|
||||||
|
if (code == TSDB_CODE_VND_READ_END) {
|
||||||
|
pReader->dataDone = 1;
|
||||||
|
} else {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// read del file
|
// read del file
|
||||||
|
if (!pReader->delDone) {
|
||||||
|
code = tsdbSnapReadDel(pReader, ppData);
|
||||||
|
if (code) {
|
||||||
|
if (code == TSDB_CODE_VND_READ_END) {
|
||||||
|
pReader->delDone = 1;
|
||||||
|
} else {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = TSDB_CODE_VND_READ_END;
|
||||||
|
|
||||||
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -74,8 +177,14 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
// STsdbSnapReader ========================================
|
||||||
int32_t code = 0;
|
struct STsdbSnapWriter {
|
||||||
taosMemoryFree(ppReader);
|
STsdb* pTsdb;
|
||||||
return code;
|
int64_t sver;
|
||||||
}
|
int64_t ever;
|
||||||
|
// for data file
|
||||||
|
int32_t iDFileSet;
|
||||||
|
SDataFWriter* pDataFWriter;
|
||||||
|
// for del file
|
||||||
|
SDelFWriter* pDelFWriter;
|
||||||
|
};
|
|
@ -87,7 +87,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pReader->tsdbDone) {
|
if (!pReader->tsdbDone) {
|
||||||
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData, &pReader->nData);
|
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code == TSDB_CODE_VND_READ_END) {
|
if (code == TSDB_CODE_VND_READ_END) {
|
||||||
pReader->tsdbDone = 1;
|
pReader->tsdbDone = 1;
|
||||||
|
|
Loading…
Reference in New Issue