more code

This commit is contained in:
Hongze Cheng 2022-09-04 17:31:20 +08:00
parent fe20dea410
commit 8ec79c9d5c
2 changed files with 72 additions and 147 deletions

View File

@ -593,11 +593,13 @@ struct STsdbReadSnap {
};
typedef struct {
TdFilePtr pFD;
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
int32_t nBuf;
uint8_t *pBuf;
int64_t pgno;
} STsdbFD;
struct SDataFWriter {

View File

@ -18,30 +18,38 @@
#define TSDB_DEFAULT_PAGE_SIZE 4096
// =============== PAGE-WISE FILE ===============
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t opt, STsdbFD **ppFD) {
#define PAGE_CONTENT_SIZE(SIZE) ((SIZE) - sizeof(TSCKSUM))
#define PAGE_OFFSET(PGNO, SIZE) (((PGNO)-1) * (SIZE))
#define OFFSET_PGNO(OFFSET, SIZE) ((OFFSET) / (SIZE) + 1)
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD;
*ppFD = NULL;
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD));
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
if (pFD == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pFD->pFD = taosOpenFile(path, opt);
pFD->path = (char *)&pFD[1];
strcpy(pFD->path, path);
pFD->szPage = szPage;
pFD->flag = flag;
pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->nBuf = 0;
pFD->pBuf = taosMemoryMalloc(pFD->szPage);
pFD->pBuf = taosMemoryMalloc(szPage);
if (pFD->pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pFD);
goto _exit;
}
*ppFD = pFD;
@ -102,12 +110,15 @@ _exit:
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
int32_t code = 0;
int64_t n = taosLSeekFile(pFD->pFD, pgno * pFD->szPage, SEEK_SET);
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
@ -117,6 +128,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
goto _exit;
}
// check
if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
@ -128,27 +140,33 @@ _exit:
return code;
}
static int64_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t count) {
int32_t code = 0;
int64_t n;
int64_t pgno = OFFSET_PGNO(offset, pFD->szPage);
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
int64_t pgno = offset / pFD->szPage;
int64_t n = 0;
ASSERT(pgno);
if (pFD->pgno == pgno) {
int64_t bOff = offset % pFD->szPage;
int64_t nRead = TMIN(pFD->szPage - bOff - sizeof(TSCKSUM), count);
memcpy(pBuf + n, pFD->pBuf + bOff, nRead);
int64_t nRead = TMIN(szPgCont - bOff, count);
ASSERT(bOff < szPgCont);
memcpy(pBuf, pFD->pBuf + bOff, nRead);
n = nRead;
pgno++;
}
while (n < count) {
code = tsdbReadFilePage(pFD, pgno);
if (code) goto _exit;
pgno++;
int64_t nRead = TMIN(pFD->szPage - sizeof(TSCKSUM), count - n);
int64_t nRead = TMIN(szPgCont, count - n);
memcpy(pBuf + n, pFD->pBuf, nRead);
n += nRead;
pgno++;
}
_exit:
@ -747,7 +765,7 @@ _err:
int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
int32_t code = 0;
if (*ppReader == NULL) goto _exit;
if (*ppReader == NULL) return code;
// head
tsdbCloseFile(&(*ppReader)->pHeadFD);
@ -767,8 +785,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
tFree((*ppReader)->aBuf[iBuf]);
}
taosMemoryFree(*ppReader);
_exit:
*ppReader = NULL;
return code;
@ -778,49 +794,27 @@ _err:
}
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->offset;
int64_t size = pReader->pSet->pHeadF->size - offset;
int64_t n;
uint32_t delimiter;
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->offset;
int64_t size = pReader->pSet->pHeadF->size - offset; // todo
taosArrayClear(aBlockIdx);
if (size == 0) {
goto _exit;
}
if (size == 0) return code;
// alloc
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// // seek
// if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (code) goto _err;
// decode
n = 0;
n = tGetU32(pReader->aBuf[0] + n, &delimiter);
uint32_t delimiter;
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) {
while (n < size) {
SBlockIdx blockIdx;
n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
@ -829,10 +823,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
goto _err;
}
}
ASSERT(n == size);
ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
return code;
_err:
@ -841,65 +833,41 @@ _err:
}
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
int32_t code = 0;
int64_t offset = pReader->pSet->aSstF[iSst]->offset;
int64_t size = pReader->pSet->aSstF[iSst]->size - offset;
int64_t n;
uint32_t delimiter;
int32_t code = 0;
int64_t offset = pReader->pSet->aSstF[iSst]->offset;
int64_t size = pReader->pSet->aSstF[iSst]->size - offset; // todo
taosArrayClear(aSstBlk);
if (size == 0) {
goto _exit;
}
if (size == 0) return code;
// alloc
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// // seek
// if (taosLSeekFile(pReader->aSstFD[iSst], offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
if (code) goto _err;
// decode
n = 0;
n = tGetU32(pReader->aBuf[0] + n, &delimiter);
uint32_t delimiter;
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) {
SSstBlk blockl;
n += tGetSstBlk(pReader->aBuf[0] + n, &blockl);
while (n < size) {
SSstBlk sstBlk;
n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk);
if (taosArrayPush(aSstBlk, &blockl) == NULL) {
if (taosArrayPush(aSstBlk, &sstBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n == size);
ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
return code;
_err:
tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
@ -907,49 +875,26 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
int64_t n;
int64_t tn;
// alloc
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// // seek
// if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// read
n = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
if (code) goto _err;
// decode
n = 0;
uint32_t delimiter;
n += tGetU32(pReader->aBuf[0] + n, &delimiter);
int64_t n = tGetU32(pReader->aBuf[0], &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
tn = tGetMapData(pReader->aBuf[0] + n, mBlock);
int64_t tn = tGetMapData(pReader->aBuf[0] + n, mBlock);
if (tn < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
n += tn;
ASSERT(n + sizeof(TSCKSUM) == size);
ASSERT(n + tn == size);
return code;
@ -967,48 +912,26 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
taosArrayClear(aColumnDataAgg);
// alloc
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
int32_t size = pSmaInfo->size;
code = tRealloc(&pReader->aBuf[0], size);
if (code) goto _err;
// // seek
// int64_t n = taosLSeekFile(pReader->pSmaFD, pSmaInfo->offset, SEEK_SET);
// if (n < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// } else if (n < pSmaInfo->offset) {
// code = TSDB_CODE_FILE_CORRUPTED;
// goto _err;
// }
// read
int64_t n = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(pReader->aBuf[0], size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], size);
if (code) goto _err;
// decode
n = 0;
int32_t n = 0;
while (n < pSmaInfo->size) {
SColumnDataAgg sma;
n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n == pSmaInfo->size);
return code;
_err: