Merge branch 'refact/tsdb_last' of github.com:taosdata/tdengine into refact/tsdb_last
This commit is contained in:
commit
0d84bd2d40
|
@ -502,6 +502,7 @@ struct SDelIdx {
|
||||||
|
|
||||||
struct SDiskDataHdr {
|
struct SDiskDataHdr {
|
||||||
uint32_t delimiter;
|
uint32_t delimiter;
|
||||||
|
uint32_t fmtVer;
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int32_t szUid;
|
int32_t szUid;
|
||||||
|
|
|
@ -1426,6 +1426,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
|
|
||||||
// ================= DATA ====================
|
// ================= DATA ====================
|
||||||
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
|
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
|
||||||
|
.fmtVer = 0,
|
||||||
.suid = pBlockData->suid,
|
.suid = pBlockData->suid,
|
||||||
.uid = pBlockData->uid,
|
.uid = pBlockData->uid,
|
||||||
.nRow = pBlockData->nRow,
|
.nRow = pBlockData->nRow,
|
||||||
|
|
|
@ -27,8 +27,10 @@ struct STsdbSnapReader {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
SDataFReader* pDataFReader;
|
SDataFReader* pDataFReader;
|
||||||
SArray* aBlockIdx; // SArray<SBlockIdx>
|
SArray* aBlockIdx; // SArray<SBlockIdx>
|
||||||
|
SArray* aBlockL; // SArray<SBlockL>
|
||||||
int32_t iBlockIdx;
|
int32_t iBlockIdx;
|
||||||
SBlockIdx* pBlockIdx;
|
SBlockIdx* pBlockIdx;
|
||||||
|
int32_t iBlockL;
|
||||||
SMapData mBlock; // SMapData<SBlock>
|
SMapData mBlock; // SMapData<SBlock>
|
||||||
int32_t iBlock;
|
int32_t iBlock;
|
||||||
SBlockData oBlockData;
|
SBlockData oBlockData;
|
||||||
|
@ -43,32 +45,37 @@ struct STsdbSnapReader {
|
||||||
|
|
||||||
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
#if 0
|
|
||||||
STsdb* pTsdb = pReader->pTsdb;
|
STsdb* pTsdb = pReader->pTsdb;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (pReader->pDataFReader == NULL) {
|
if (pReader->pDataFReader == NULL) {
|
||||||
SDFileSet* pSet =
|
// next
|
||||||
taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
|
SDFileSet dFileSet = {.fid = pReader->fid};
|
||||||
|
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
|
||||||
if (pSet == NULL) goto _exit;
|
if (pSet == NULL) goto _exit;
|
||||||
|
|
||||||
pReader->fid = pSet->fid;
|
pReader->fid = pSet->fid;
|
||||||
|
|
||||||
|
// load
|
||||||
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
|
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// SBlockIdx
|
|
||||||
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
|
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbReadBlockL(pReader->pDataFReader, pReader->aBlockL);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// init
|
||||||
pReader->iBlockIdx = 0;
|
pReader->iBlockIdx = 0;
|
||||||
pReader->pBlockIdx = NULL;
|
pReader->pBlockIdx = NULL;
|
||||||
|
pReader->iBlockL = 0;
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
|
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
pReader->fid);
|
pReader->fid);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
#if 0
|
||||||
if (pReader->pBlockIdx == NULL) {
|
if (pReader->pBlockIdx == NULL) {
|
||||||
if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
|
if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
|
||||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||||
|
@ -149,14 +156,15 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
|
// tsdbInfo("vgId:%d, vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
|
||||||
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
|
// " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
|
||||||
TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
// TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
|
||||||
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
|
// pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
|
||||||
size);
|
// size);
|
||||||
|
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +174,6 @@ _exit:
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d, vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
tsdbError("vgId:%d, vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
#endif
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,6 +301,11 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
pReader->aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
||||||
|
if (pReader->aBlockL == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pReader->mBlock = tMapDataInit();
|
pReader->mBlock = tMapDataInit();
|
||||||
code = tBlockDataCreate(&pReader->oBlockData);
|
code = tBlockDataCreate(&pReader->oBlockData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -329,6 +341,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
|
||||||
if (pReader->pDataFReader) {
|
if (pReader->pDataFReader) {
|
||||||
tsdbDataFReaderClose(&pReader->pDataFReader);
|
tsdbDataFReaderClose(&pReader->pDataFReader);
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(pReader->aBlockL);
|
||||||
taosArrayDestroy(pReader->aBlockIdx);
|
taosArrayDestroy(pReader->aBlockIdx);
|
||||||
tMapDataClear(&pReader->mBlock);
|
tMapDataClear(&pReader->mBlock);
|
||||||
tBlockDataDestroy(&pReader->oBlockData, 1);
|
tBlockDataDestroy(&pReader->oBlockData, 1);
|
||||||
|
|
|
@ -1549,6 +1549,7 @@ int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
|
||||||
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
|
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
|
||||||
|
|
||||||
n += tPutU32(p ? p + n : p, pHdr->delimiter);
|
n += tPutU32(p ? p + n : p, pHdr->delimiter);
|
||||||
|
n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
|
||||||
n += tPutI64(p ? p + n : p, pHdr->suid);
|
n += tPutI64(p ? p + n : p, pHdr->suid);
|
||||||
n += tPutI64(p ? p + n : p, pHdr->uid);
|
n += tPutI64(p ? p + n : p, pHdr->uid);
|
||||||
n += tPutI32v(p ? p + n : p, pHdr->szUid);
|
n += tPutI32v(p ? p + n : p, pHdr->szUid);
|
||||||
|
@ -1566,6 +1567,7 @@ int32_t tGetDiskDataHdr(uint8_t *p, void *ph) {
|
||||||
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
|
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
|
||||||
|
|
||||||
n += tGetU32(p + n, &pHdr->delimiter);
|
n += tGetU32(p + n, &pHdr->delimiter);
|
||||||
|
n += tGetU32v(p + n, &pHdr->fmtVer);
|
||||||
n += tGetI64(p + n, &pHdr->suid);
|
n += tGetI64(p + n, &pHdr->suid);
|
||||||
n += tGetI64(p + n, &pHdr->uid);
|
n += tGetI64(p + n, &pHdr->uid);
|
||||||
n += tGetI32v(p + n, &pHdr->szUid);
|
n += tGetI32v(p + n, &pHdr->szUid);
|
||||||
|
|
Loading…
Reference in New Issue