more tsdb snapshow writer
This commit is contained in:
parent
4d3e313e30
commit
9720f05491
|
@ -64,6 +64,7 @@ typedef struct SRowIter SRowIter;
|
||||||
typedef struct STsdbFS STsdbFS;
|
typedef struct STsdbFS STsdbFS;
|
||||||
typedef struct SRowMerger SRowMerger;
|
typedef struct SRowMerger SRowMerger;
|
||||||
typedef struct STsdbFSState STsdbFSState;
|
typedef struct STsdbFSState STsdbFSState;
|
||||||
|
typedef struct STsdbSnapHdr STsdbSnapHdr;
|
||||||
|
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
#define TSDB_FHDR_SIZE 512
|
#define TSDB_FHDR_SIZE 512
|
||||||
|
|
|
@ -308,8 +308,11 @@ struct STsdbSnapWriter {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
|
// config
|
||||||
|
int32_t minutes;
|
||||||
|
int8_t precision;
|
||||||
// for data file
|
// for data file
|
||||||
int32_t iDFileSet;
|
int32_t fid;
|
||||||
SDataFWriter* pDataFWriter;
|
SDataFWriter* pDataFWriter;
|
||||||
// for del file
|
// for del file
|
||||||
SDelFWriter* pDelFWriter;
|
SDelFWriter* pDelFWriter;
|
||||||
|
@ -327,6 +330,47 @@ static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STsdb* pTsdb = pWriter->pTsdb;
|
||||||
|
int64_t suid = 0; // todo
|
||||||
|
int64_t uid = 0; // todo
|
||||||
|
int64_t skey; // todo
|
||||||
|
int64_t ekey; // todo
|
||||||
|
|
||||||
|
int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision);
|
||||||
|
ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision));
|
||||||
|
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
|
||||||
|
if (pWriter->pDataFWriter) {
|
||||||
|
// finish current file and close the SDataFWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
pWriter->fid = fid;
|
||||||
|
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid);
|
||||||
|
SDFileSet wSet = {0};
|
||||||
|
if (pSet == NULL) {
|
||||||
|
wSet = (SDFileSet){0}; // todo
|
||||||
|
} else {
|
||||||
|
wSet = (SDFileSet){0}; // todo
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
|
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbSnapWriter* pWriter = NULL;
|
STsdbSnapWriter* pWriter = NULL;
|
||||||
|
@ -374,6 +418,31 @@ _err:
|
||||||
|
|
||||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
int8_t type = pData[0];
|
||||||
|
|
||||||
|
// ts data
|
||||||
|
if (type == 0) {
|
||||||
|
code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1);
|
||||||
|
if (code) goto _err;
|
||||||
|
} else {
|
||||||
|
if (pWriter->pDataFWriter) {
|
||||||
|
// commit the remain data of the FSet (todo)
|
||||||
|
|
||||||
|
// close and update the file
|
||||||
|
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// del data
|
||||||
|
if (type == 1) {
|
||||||
|
code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue