more snapshot
This commit is contained in:
parent
a4e4f4cc4a
commit
117bc90dad
|
@ -368,6 +368,7 @@ struct SSma {
|
|||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||
|
||||
enum {
|
||||
SNAP_DATA_CFG = 0,
|
||||
SNAP_DATA_META = 1,
|
||||
SNAP_DATA_TSDB = 2,
|
||||
SNAP_DATA_DEL = 3,
|
||||
|
|
|
@ -354,6 +354,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
|
||||
*ppData = NULL;
|
||||
|
||||
#if 0
|
||||
// read data file
|
||||
if (!pReader->dataDone) {
|
||||
code = tsdbSnapReadData(pReader, ppData);
|
||||
|
@ -367,6 +368,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// read del file
|
||||
if (!pReader->delDone) {
|
||||
|
@ -956,6 +958,7 @@ _err:
|
|||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
@ -966,7 +969,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
|
|||
|
||||
// reader
|
||||
if (pDelFile) {
|
||||
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
|
||||
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
|
||||
|
@ -983,51 +986,15 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
|
|||
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
|
||||
|
||||
while (true) {
|
||||
SDelIdx* pDelIdx = NULL;
|
||||
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
|
||||
SDelData delData;
|
||||
SDelIdx delIdx;
|
||||
int8_t toBreak = 0;
|
||||
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
|
||||
if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break;
|
||||
|
||||
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
|
||||
pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||
}
|
||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||
|
||||
if (pDelIdx) {
|
||||
int32_t c = tTABLEIDCmprFn(&id, pDelIdx);
|
||||
if (c < 0) {
|
||||
goto _new_del;
|
||||
} else {
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _err;
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iDelIdx++;
|
||||
if (c == 0) {
|
||||
toBreak = 1;
|
||||
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
|
||||
goto _merge_del;
|
||||
} else {
|
||||
delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
|
||||
goto _write_del;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_new_del:
|
||||
toBreak = 1;
|
||||
delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
|
||||
taosArrayClear(pWriter->aDelData);
|
||||
|
||||
_merge_del:
|
||||
while (n < nData) {
|
||||
n += tGetDelData(pData + n, &delData);
|
||||
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
_write_del:
|
||||
SDelIdx delIdx = *pDelIdx;
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -1036,7 +1003,40 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (toBreak) break;
|
||||
pWriter->iDelIdx++;
|
||||
}
|
||||
|
||||
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
|
||||
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
|
||||
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
|
||||
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
pWriter->iDelIdx++;
|
||||
} else {
|
||||
taosArrayClear(pWriter->aDelData);
|
||||
}
|
||||
|
||||
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
|
||||
while (n < nData) {
|
||||
SDelData delData;
|
||||
|
||||
n += tGetDelData(pData + n, &delData);
|
||||
|
||||
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
SDelIdx delIdx = {.suid = id.suid, .uid = id.uid};
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -1047,9 +1047,7 @@ _err:
|
|||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
@ -1062,7 +1060,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
|
|||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
|
||||
SDelIdx delIdx = *pDelIdx;
|
||||
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
|
@ -1095,11 +1093,9 @@ _err:
|
|||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
int32_t code = 0;
|
||||
STsdbSnapWriter* pWriter = NULL;
|
||||
|
||||
// alloc
|
||||
|
@ -1168,7 +1164,6 @@ _err:
|
|||
tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||
tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
#endif
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1210,10 +1205,10 @@ _err:
|
|||
}
|
||||
|
||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
int32_t code = 0;
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
|
||||
#if 0
|
||||
// ts data
|
||||
if (pHdr->type == SNAP_DATA_TSDB) {
|
||||
code = tsdbSnapWriteData(pWriter, pData, nData);
|
||||
|
@ -1226,6 +1221,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
|
|||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// del data
|
||||
if (pHdr->type == SNAP_DATA_DEL) {
|
||||
|
@ -1241,6 +1237,5 @@ _exit:
|
|||
_err:
|
||||
tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
|
||||
tstrerror(code));
|
||||
#endif
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue