more vnode snapshot
This commit is contained in:
parent
6beb221c82
commit
338a763853
|
@ -202,7 +202,7 @@ int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
|
||||||
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
|
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
|
||||||
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
|
void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid);
|
||||||
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
|
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
|
||||||
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
|
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag);
|
||||||
// tsdbReaderWriter.c ==============================================================================================
|
// tsdbReaderWriter.c ==============================================================================================
|
||||||
// SDataFWriter
|
// SDataFWriter
|
||||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
|
||||||
|
|
|
@ -263,7 +263,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
taosArrayClear(pCommitter->aBlockIdx);
|
taosArrayClear(pCommitter->aBlockIdx);
|
||||||
tMapDataReset(&pCommitter->oBlockMap);
|
tMapDataReset(&pCommitter->oBlockMap);
|
||||||
tBlockDataReset(&pCommitter->oBlockData);
|
tBlockDataReset(&pCommitter->oBlockData);
|
||||||
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
|
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ);
|
||||||
if (pRSet) {
|
if (pRSet) {
|
||||||
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
|
@ -698,6 +698,6 @@ void tsdbFSStateDeleteDFileSet(STsdbFSState *pState, int32_t fid) {
|
||||||
|
|
||||||
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
|
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
|
||||||
|
|
||||||
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {
|
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid, int32_t flag) {
|
||||||
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
|
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, flag);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,10 +40,12 @@ struct STsdbSnapReader {
|
||||||
|
|
||||||
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
STsdb* pTsdb = pReader->pTsdb;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (pReader->pDataFReader == NULL) {
|
if (pReader->pDataFReader == NULL) {
|
||||||
SDFileSet* pSet = NULL;
|
SDFileSet* pSet = NULL;
|
||||||
|
// taosArraySearch(pTsdb->fs->cState->aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSe)
|
||||||
|
|
||||||
// search the next data file set to read (todo)
|
// search the next data file set to read (todo)
|
||||||
if (0 /* TODO */) {
|
if (0 /* TODO */) {
|
||||||
|
@ -106,10 +108,13 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
// if (*ppData) {
|
||||||
|
// tsdbInfo("vgId:%d ");
|
||||||
|
// }
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d snap read data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,14 +214,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
pReader->fid = INT32_MIN;
|
||||||
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
||||||
if (pReader->aBlockIdx == NULL) {
|
if (pReader->aBlockIdx == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->mBlock = tMapDataInit();
|
pReader->mBlock = tMapDataInit();
|
||||||
|
|
||||||
code = tBlockDataInit(&pReader->blkData);
|
code = tBlockDataInit(&pReader->blkData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
@ -225,7 +229,6 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
|
pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
|
||||||
if (pReader->aDelData == NULL) {
|
if (pReader->aDelData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -246,37 +249,42 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdbSnapReader* pReader = *ppReader;
|
STsdbSnapReader* pReader = *ppReader;
|
||||||
|
|
||||||
taosArrayDestroy(pReader->aDelData);
|
|
||||||
taosArrayDestroy(pReader->aDelIdx);
|
|
||||||
if (pReader->pDelFReader) {
|
|
||||||
tsdbDelFReaderClose(&pReader->pDelFReader);
|
|
||||||
}
|
|
||||||
tBlockDataClear(&pReader->blkData);
|
|
||||||
tMapDataClear(&pReader->mBlock);
|
|
||||||
taosArrayDestroy(pReader->aBlockIdx);
|
|
||||||
if (pReader->pDataFReader) {
|
if (pReader->pDataFReader) {
|
||||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(pReader->aBlockIdx);
|
||||||
|
tMapDataClear(&pReader->mBlock);
|
||||||
|
tBlockDataClear(&pReader->blkData);
|
||||||
|
|
||||||
|
if (pReader->pDelFReader) {
|
||||||
|
tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pReader->aDelIdx);
|
||||||
|
taosArrayDestroy(pReader->aDelData);
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode));
|
||||||
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
*ppData = NULL;
|
||||||
|
|
||||||
// read data file
|
// read data file
|
||||||
if (!pReader->dataDone) {
|
if (!pReader->dataDone) {
|
||||||
code = tsdbSnapReadData(pReader, ppData);
|
code = tsdbSnapReadData(pReader, ppData);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code == TSDB_CODE_VND_READ_END) {
|
|
||||||
pReader->dataDone = 1;
|
|
||||||
} else {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
if (*ppData) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
} else {
|
||||||
|
pReader->dataDone = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,23 +292,24 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
if (!pReader->delDone) {
|
if (!pReader->delDone) {
|
||||||
code = tsdbSnapReadDel(pReader, ppData);
|
code = tsdbSnapReadDel(pReader, ppData);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code == TSDB_CODE_VND_READ_END) {
|
|
||||||
pReader->delDone = 1;
|
|
||||||
} else {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
if (*ppData) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
} else {
|
||||||
|
pReader->delDone = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = TSDB_CODE_VND_READ_END;
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (*ppData) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d snapshot read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -520,7 +529,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pWriter->fid = fid;
|
pWriter->fid = fid;
|
||||||
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid);
|
SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ);
|
||||||
// reader
|
// reader
|
||||||
if (pSet) {
|
if (pSet) {
|
||||||
// open
|
// open
|
||||||
|
|
|
@ -125,7 +125,8 @@ _exit:
|
||||||
pReader->index++;
|
pReader->index++;
|
||||||
*nData = sizeof(SSnapDataHdr) + pHdr->size;
|
*nData = sizeof(SSnapDataHdr) + pHdr->size;
|
||||||
pHdr->index = pReader->index;
|
pHdr->index = pReader->index;
|
||||||
vInfo("vgId:%d vnode snapshot read data, nData:%d index:%" PRId64, TD_VID(pReader->pVnode), *nData, pReader->index);
|
vInfo("vgId:%d vnode snapshot read data,index:%" PRId64 " type:%d nData:%d ", TD_VID(pReader->pVnode),
|
||||||
|
pReader->index, pHdr->type, *nData);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
|
vInfo("vgId:%d vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index);
|
||||||
}
|
}
|
||||||
|
@ -236,8 +237,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
vInfo("vgId:%d vnode snapshot write data, type:%d index:%" PRId64 " nData:%d", TD_VID(pVnode), pHdr->type,
|
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->type, nData,
|
||||||
pHdr->index, nData);
|
pHdr->index);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
Loading…
Reference in New Issue