more code
This commit is contained in:
parent
b01ed3bb80
commit
9a95abd26d
|
@ -135,12 +135,17 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||
int metaDecodeEntryImpl(SDecoder *pCoder, SMetaEntry *pME, bool headerOnly) {
|
||||
TAOS_CHECK_RETURN(tStartDecode(pCoder));
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version));
|
||||
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type));
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid));
|
||||
|
||||
if (headerOnly) {
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pME->type > 0) {
|
||||
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name));
|
||||
|
||||
|
@ -209,6 +214,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { return metaDecodeEntryImpl(pCoder, pME, false); }
|
||||
|
||||
static int32_t metaCloneSchema(const SSchemaWrapper *pSrc, SSchemaWrapper *pDst) {
|
||||
if (pSrc == NULL || pDst == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
|
|
@ -266,22 +266,17 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
|||
vnodeGetMetaPath(pVnode, VNODE_META_TMP_DIR, metaTempDir);
|
||||
|
||||
// Check file states
|
||||
if (!taosCheckExistFile(metaDir)) {
|
||||
if (!taosCheckExistFile(metaTempDir)) {
|
||||
metaError("vgId:%d, cannot find meta dir:%s and meta temp dir:%s", TD_VID(pVnode), metaDir, metaTempDir);
|
||||
return TSDB_CODE_FAILED;
|
||||
} else {
|
||||
code = taosRenameFile(metaTempDir, metaDir);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s: rename %s to %s failed", TD_VID(pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(code), metaTempDir, metaDir);
|
||||
return code;
|
||||
}
|
||||
if (!taosCheckExistFile(metaDir) && taosCheckExistFile(metaTempDir)) {
|
||||
code = taosRenameFile(metaTempDir, metaDir);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s: rename %s to %s failed", TD_VID(pVnode), __func__, __FILE__,
|
||||
__LINE__, tstrerror(code), metaTempDir, metaDir);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// Do open meta
|
||||
code = metaOpenImpl(pVnode, ppMeta, metaDir, rollback);
|
||||
code = metaOpenImpl(pVnode, ppMeta, VNODE_META_DIR, rollback);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
|
|
|
@ -21,6 +21,7 @@ struct SMetaSnapReader {
|
|||
int64_t sver;
|
||||
int64_t ever;
|
||||
TBC* pTbc;
|
||||
int32_t iLoop;
|
||||
};
|
||||
|
||||
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
|
||||
|
@ -65,6 +66,22 @@ void metaSnapReaderClose(SMetaSnapReader** ppReader) {
|
|||
}
|
||||
}
|
||||
|
||||
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
|
||||
|
||||
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)data, size);
|
||||
|
||||
int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
|
||||
if (code) {
|
||||
tDecoderClear(&decoder);
|
||||
return code;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
const void* pKey = NULL;
|
||||
|
@ -72,19 +89,47 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
|||
int32_t nKey = 0;
|
||||
int32_t nData = 0;
|
||||
STbDbKey key;
|
||||
int32_t c;
|
||||
|
||||
*ppData = NULL;
|
||||
for (;;) {
|
||||
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
|
||||
while (pReader->iLoop < 2) {
|
||||
if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
|
||||
pReader->iLoop++;
|
||||
|
||||
// Reopen the cursor to read from the beginning
|
||||
tdbTbcClose(pReader->pTbc);
|
||||
pReader->pTbc = NULL;
|
||||
code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Decode meta entry
|
||||
SMetaEntry entry = {0};
|
||||
code = metaDecodeEntryHeader((void*)pData, nData, &entry);
|
||||
if (code) {
|
||||
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||
tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
key = ((STbDbKey*)pKey)[0];
|
||||
if (key.version > pReader->ever) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (key.version < pReader->sver) {
|
||||
if (key.version < pReader->sver //
|
||||
|| (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE) // First loop send super table entry
|
||||
|| (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE) // Second loop send non-super table entry
|
||||
) {
|
||||
if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
|
||||
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue