more code
This commit is contained in:
parent
8ec79c9d5c
commit
dfe82efcd7
|
@ -18,9 +18,12 @@
|
|||
#define TSDB_DEFAULT_PAGE_SIZE 4096
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
#define PAGE_CONTENT_SIZE(SIZE) ((SIZE) - sizeof(TSCKSUM))
|
||||
#define PAGE_OFFSET(PGNO, SIZE) (((PGNO)-1) * (SIZE))
|
||||
#define OFFSET_PGNO(OFFSET, SIZE) ((OFFSET) / (SIZE) + 1)
|
||||
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
|
||||
#define LOGIC_TO_FILE_OFFSET(OFFSET, PAGE) \
|
||||
((OFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (OFFSET) % PAGE_CONTENT_SIZE(PAGE))
|
||||
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
|
||||
#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE))
|
||||
#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1)
|
||||
|
||||
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t code = 0;
|
||||
|
@ -140,16 +143,17 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
|
||||
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
int64_t pgno = OFFSET_PGNO(offset, pFD->szPage);
|
||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
||||
|
||||
ASSERT(pgno);
|
||||
if (pFD->pgno == pgno) {
|
||||
int64_t bOff = offset % pFD->szPage;
|
||||
int64_t nRead = TMIN(szPgCont - bOff, count);
|
||||
int64_t bOff = fOffset % pFD->szPage;
|
||||
int64_t nRead = TMIN(szPgCont - bOff, size);
|
||||
|
||||
ASSERT(bOff < szPgCont);
|
||||
|
||||
|
@ -158,11 +162,11 @@ static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t
|
|||
pgno++;
|
||||
}
|
||||
|
||||
while (n < count) {
|
||||
while (n < size) {
|
||||
code = tsdbReadFilePage(pFD, pgno);
|
||||
if (code) goto _exit;
|
||||
|
||||
int64_t nRead = TMIN(szPgCont, count - n);
|
||||
int64_t nRead = TMIN(szPgCont, size - n);
|
||||
memcpy(pBuf + n, pFD->pBuf, nRead);
|
||||
|
||||
n += nRead;
|
||||
|
@ -794,9 +798,10 @@ _err:
|
|||
}
|
||||
|
||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pReader->pSet->pHeadF->offset;
|
||||
int64_t size = pReader->pSet->pHeadF->size - offset; // todo
|
||||
int32_t code = 0;
|
||||
SHeadFile *pHeadFile = pReader->pSet->pHeadF;
|
||||
int64_t offset = pHeadFile->offset;
|
||||
int64_t size = pHeadFile->size - offset;
|
||||
|
||||
taosArrayClear(aBlockIdx);
|
||||
if (size == 0) return code;
|
||||
|
@ -810,10 +815,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
|||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
uint32_t delimiter;
|
||||
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
|
||||
ASSERT(delimiter == TSDB_FILE_DLMT);
|
||||
|
||||
int64_t n = 0;
|
||||
while (n < size) {
|
||||
SBlockIdx blockIdx;
|
||||
n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
|
||||
|
@ -833,9 +835,10 @@ _err:
|
|||
}
|
||||
|
||||
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pReader->pSet->aSstF[iSst]->offset;
|
||||
int64_t size = pReader->pSet->aSstF[iSst]->size - offset; // todo
|
||||
int32_t code = 0;
|
||||
SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
|
||||
int64_t offset = pSstFile->offset;
|
||||
int64_t size = pSstFile->size - offset;
|
||||
|
||||
taosArrayClear(aSstBlk);
|
||||
if (size == 0) return code;
|
||||
|
@ -849,10 +852,7 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
|
|||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
uint32_t delimiter;
|
||||
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
|
||||
ASSERT(delimiter == TSDB_FILE_DLMT);
|
||||
|
||||
int64_t n = 0;
|
||||
while (n < size) {
|
||||
SSstBlk sstBlk;
|
||||
n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk);
|
||||
|
@ -885,16 +885,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
|
|||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
uint32_t delimiter;
|
||||
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
|
||||
ASSERT(delimiter == TSDB_FILE_DLMT);
|
||||
|
||||
int64_t tn = tGetMapData(pReader->aBuf[0] + n, mBlock);
|
||||
if (tn < 0) {
|
||||
int64_t n = tGetMapData(pReader->aBuf[0], mBlock);
|
||||
if (n < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(n + tn == size);
|
||||
ASSERT(n == size);
|
||||
|
||||
return code;
|
||||
|
||||
|
@ -912,12 +908,11 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
|
|||
taosArrayClear(aColumnDataAgg);
|
||||
|
||||
// alloc
|
||||
int32_t size = pSmaInfo->size;
|
||||
code = tRealloc(&pReader->aBuf[0], size);
|
||||
code = tRealloc(&pReader->aBuf[0], pSmaInfo->size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
@ -1117,7 +1112,8 @@ int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk
|
|||
int32_t code = 0;
|
||||
|
||||
// read
|
||||
tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
|
||||
code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
|
||||
if (code) goto _exit;
|
||||
|
||||
// decmpr
|
||||
code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
|
||||
|
|
Loading…
Reference in New Issue