more refact

This commit is contained in:
Hongze Cheng 2022-08-07 15:20:28 +00:00
parent 71691eb7da
commit 7671bfe19e
3 changed files with 67 additions and 49 deletions

View File

@ -188,6 +188,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);

View File

@ -401,6 +401,7 @@ struct SDataFReader {
uint8_t *pBuf1; uint8_t *pBuf1;
uint8_t *pBuf2; uint8_t *pBuf2;
uint8_t *pBuf3;
}; };
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) { int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
@ -485,6 +486,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
tFree((*ppReader)->pBuf1); tFree((*ppReader)->pBuf1);
tFree((*ppReader)->pBuf2); tFree((*ppReader)->pBuf2);
tFree((*ppReader)->pBuf3);
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
_exit: _exit:
@ -729,94 +731,75 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
int32_t nColId, SBlockData *pBlockData) { int32_t nColId, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
// TODO
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD;
// seek // uid + version + tskey
int64_t n = taosLSeekFile(pFD, pBlkInfo->offset, SEEK_SET); code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
code = tRealloc(&pReader->pBuf1, pBlkInfo->szBlock);
if (code) goto _err; if (code) goto _err;
n = taosReadFile(pFD, pReader->pBuf1, pBlkInfo->szBlock);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pBlkInfo->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
uint8_t *p = pReader->pBuf1;
// check & decode
SDiskDataHdr hdr; SDiskDataHdr hdr;
if (!taosCheckChecksumWhole(p, pBlkInfo->szKey)) { uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr);
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += tGetDiskDataHdr(p, &hdr);
tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid); ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(hdr.suid || hdr.uid);
pBlockData->suid = hdr.suid;
pBlockData->uid = hdr.uid;
pBlockData->nRow = hdr.nRow; pBlockData->nRow = hdr.nRow;
// uid
if (hdr.uid == 0) { if (hdr.uid == 0) {
ASSERT(hdr.szUid); ASSERT(hdr.szUid);
code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); sizeof(int64_t) * hdr.nRow, &pReader->pBuf2);
if (code) goto _err; if (code) goto _err;
} else { } else {
ASSERT(hdr.szUid == 0); ASSERT(!hdr.szUid);
} }
p += hdr.szUid; p += hdr.szUid;
// version
code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
sizeof(int64_t) * hdr.nRow, &pReader->pBuf2); sizeof(int64_t) * hdr.nRow, &pReader->pBuf2);
if (code) goto _err; if (code) goto _err;
p += hdr.szVer; p += hdr.szVer;
code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, // TSKEY
code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
sizeof(TSKEY) * hdr.nRow, &pReader->pBuf2); sizeof(TSKEY) * hdr.nRow, &pReader->pBuf2);
if (code) goto _err; if (code) goto _err;
p += hdr.szKey; p += hdr.szKey;
p += sizeof(TSCKSUM);
// SBlockCol ASSERT(p - pReader->pBuf1 == sizeof(TSCKSUM));
// read and decode columns
if (hdr.szBlkCol > 0) { if (hdr.szBlkCol > 0) {
if (!taosCheckChecksumWhole(p, hdr.szBlkCol + sizeof(TSCKSUM))) { code =
code = TSDB_CODE_FILE_CORRUPTED; tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, hdr.szBlkCol + sizeof(TSCKSUM));
goto _err; if (code) goto _err;
}
int32_t iColData = 0; int32_t n = 0;
uint8_t *pt = p + hdr.szBlkCol + sizeof(TSCKSUM);
n = 0;
while (n < hdr.szBlkCol) { while (n < hdr.szBlkCol) {
SBlockCol blockCol; SBlockCol blockCol;
n += tGetBlockCol(p + n, &blockCol); n += tGetBlockCol(pReader->pBuf1 + n, &blockCol);
ASSERT(blockCol.flag && blockCol.flag != HAS_NONE); ASSERT(blockCol.flag && blockCol.flag != HAS_NONE);
SColData *pColData; // TODO: merge with the column IDs
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _err;
iColData++;
tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn); SColData *pColData = NULL; // (todo)
if (blockCol.flag == HAS_NULL) { if (blockCol.flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { // TODO: make a hdr.nRow COL_VAL_NULL();
code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type));
if (code) goto _err;
}
} else { } else {
code = tsdbDecmprColData(pt + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf2); code = tsdbReadAndCheckFile(
pFD, pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &pReader->pBuf2,
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM));
code = tsdbDecmprColData(pReader->pBuf2, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3);
if (code) goto _err; if (code) goto _err;
} }
} }

View File

@ -1865,6 +1865,40 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
} }
p += pBlockCol->szValue; p += pBlockCol->szValue;
_exit:
return code;
}
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size) {
int32_t code = 0;
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// seek
int64_t n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
// read
n = taosReadFile(pFD, *ppOut, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
// check
if (!taosCheckChecksumWhole(*ppOut, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
_exit: _exit:
return code; return code;
} }