more vnode snapshot

This commit is contained in:
Hongze Cheng 2022-07-12 12:16:40 +00:00
parent 946f1101fd
commit 1e3fff30e7
3 changed files with 109 additions and 54 deletions

View File

@ -309,6 +309,7 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
struct SSnapDataHdr { struct SSnapDataHdr {
int8_t type; int8_t type;
int64_t index;
int64_t size; int64_t size;
uint8_t data[]; uint8_t data[];
}; };

View File

@ -26,60 +26,72 @@ struct SMetaSnapReader {
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
int32_t c = 0; int32_t c = 0;
SMetaSnapReader* pMetaSnapReader = NULL; SMetaSnapReader* pReader = NULL;
// alloc // alloc
pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader)); pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pMetaSnapReader == NULL) { if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pMetaSnapReader->pMeta = pMeta; pReader->pMeta = pMeta;
pMetaSnapReader->sver = sver; pReader->sver = sver;
pMetaSnapReader->ever = ever; pReader->ever = ever;
// impl // impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL); code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) { if (code) {
taosMemoryFree(pReader);
goto _err; goto _err;
} }
*ppReader = pMetaSnapReader; metaInfo("vgId:%d vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
*ppReader = pReader;
return code; return code;
_err: _err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); metaError("vgId:%d vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) { int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pTbc); tdbTbcClose((*ppReader)->pTbc);
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
*ppReader = NULL; *ppReader = NULL;
return 0;
return code;
} }
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL; const void* pKey = NULL;
const void* pData = NULL; const void* pData = NULL;
int32_t nKey = 0; int32_t nKey = 0;
int32_t nData = 0; int32_t nData = 0;
int32_t code = 0; STbDbKey key;
*ppData = NULL;
for (;;) { for (;;) {
code = tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData); if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
if (code || ((STbDbKey*)pData)->version > pReader->ever) {
code = TSDB_CODE_VND_READ_END;
goto _exit; goto _exit;
} }
if (((STbDbKey*)pData)->version < pReader->sver) { key = ((STbDbKey*)pKey)[0];
if (key.version > pReader->ever) {
goto _exit;
}
if (key.version < pReader->sver) {
tdbTbcMoveToNext(pReader->pTbc); tdbTbcMoveToNext(pReader->pTbc);
continue; continue;
} }
@ -88,17 +100,28 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
break; break;
} }
// copy the data ASSERT(pData && nData);
if (tRealloc(ppData, sizeof(SSnapDataHdr) + nData) < 0) {
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; goto _err;
} }
((SSnapDataHdr*)(*ppData))->type = 0; // TODO: use macro
((SSnapDataHdr*)(*ppData))->size = nData; SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
memcpy(((SSnapDataHdr*)(*ppData))->data, pData, nData); pHdr->type = 0; // TODO: use macro
pHdr->size = nData;
memcpy(pHdr->data, pData, nData);
metaInfo("vgId:%d vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
_exit: _exit:
return code; return code;
_err:
metaError("vgId:%d vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
return code;
} }
// SMetaSnapWriter ======================================== // SMetaSnapWriter ========================================

View File

@ -20,13 +20,13 @@ struct SVSnapReader {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
int8_t metaDone; int8_t metaDone;
SMetaSnapReader *pMetaReader; SMetaSnapReader *pMetaReader;
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
uint8_t *pData;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
@ -42,12 +42,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapRe
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
code = metaSnapReaderOpen(pVnode->pMeta, sver, ever, &pReader->pMetaReader); vInfo("vgId:%d vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
if (code) goto _err;
code = tsdbSnapReaderOpen(pVnode->pTsdb, sver, ever, &pReader->pTsdbReader);
if (code) goto _err;
*ppReader = pReader; *ppReader = pReader;
return code; return code;
@ -60,50 +55,75 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
tFree(pReader->pData); // tFree(pReader->pData);
if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader); // if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader); // if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader);
taosMemoryFree(pReader); // taosMemoryFree(pReader);
vInfo("vgId:%d vnode snapshot reader closed", TD_VID(pReader->pVnode));
return code; return code;
} }
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
// META ==============
if (!pReader->metaDone) { if (!pReader->metaDone) {
code = metaSnapRead(pReader->pMetaReader, &pReader->pData); // open reader if not
if (pReader->pMetaReader == NULL) {
code = metaSnapReaderOpen(pReader->pVnode->pMeta, pReader->sver, pReader->ever, &pReader->pMetaReader);
if (code) goto _err;
}
code = metaSnapRead(pReader->pMetaReader, ppData);
if (code) { if (code) {
if (code == TSDB_CODE_VND_READ_END) { goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->metaDone = 1; pReader->metaDone = 1;
} else { code = metaSnapReaderClose(&pReader->pMetaReader);
goto _err; if (code) goto _err;
vInfo("vgId:%d vnode snapshot meta data read end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
} }
} else {
*ppData = pReader->pData;
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
goto _exit;
} }
} }
// TSDB ==============
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData); // open if not
if (code) { // if (pReader->pTsdbReader == NULL) {
if (code == TSDB_CODE_VND_READ_END) { // code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader);
pReader->tsdbDone = 1; // if (code) goto _err;
} else { // }
goto _err;
} // code = tsdbSnapRead(pReader->pTsdbReader, &pReader->pData);
} else { // if (code) {
*ppData = pReader->pData; // if (code == TSDB_CODE_VND_READ_END) {
*nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size; // pReader->tsdbDone = 1;
goto _exit; // } else {
} // goto _err;
// }
// } else {
// *ppData = pReader->pData;
// *nData = sizeof(SSnapDataHdr) + ((SSnapDataHdr *)pReader->pData)->size;
// goto _exit;
// }
} }
code = TSDB_CODE_VND_READ_END; *ppData = NULL;
*nData = 0;
_exit: _exit:
if (*ppData) {
pReader->index++;
((SSnapDataHdr *)(*ppData))->index = pReader->index;
vInfo("vgId:%d vnode snapshot read data, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
} else {
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
}
return code; return code;
_err: _err:
@ -116,6 +136,7 @@ struct SVSnapWriter {
SVnode *pVnode; SVnode *pVnode;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index;
// meta // meta
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
@ -148,16 +169,22 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
*ppWriter = pWriter;
return code; return code;
_err: _err:
vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot writer open failed since %s", TD_VID(pVnode), tstrerror(code));
*ppWriter = NULL;
return code; return code;
} }
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
goto _exit;
if (rollback) { if (rollback) {
code = vnodeSnapRollback(pWriter); code = vnodeSnapRollback(pWriter);
if (code) goto _err; if (code) goto _err;
@ -166,6 +193,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
_exit:
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
@ -179,6 +207,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pSnapDataHdr = (SSnapDataHdr *)pData;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
goto _exit;
ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData); ASSERT(pSnapDataHdr->size + sizeof(SSnapDataHdr) == nData);
if (pSnapDataHdr->type == 0) { if (pSnapDataHdr->type == 0) {
@ -201,6 +231,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (code) goto _err; if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err: