more code

This commit is contained in:
Hongze Cheng 2022-09-04 18:42:23 +08:00
parent 28b56baa98
commit 3b0008a625
2 changed files with 23 additions and 47 deletions

View File

@ -934,18 +934,20 @@ _err:
return code; return code;
} }
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) {
SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
STsdbFD *pFD = fromLast ? pReader->aSstFD[0] : pReader->pDataFD; // (todo) STsdbFD *pFD = pReader->pDataFD;
// todo: realloc pReader->aBuf[0]
// uid + version + tskey // uid + version + tskey
tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey); // todo code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
if (code) goto _err;
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
if (code) goto _err;
SDiskDataHdr hdr; SDiskDataHdr hdr;
uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr); uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);
@ -978,14 +980,15 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
if (code) goto _err; if (code) goto _err;
p += hdr.szKey; p += hdr.szKey;
ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey - sizeof(TSCKSUM)); ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
// read and decode columns // read and decode columns
if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit; if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
if (hdr.szBlkCol > 0) { if (hdr.szBlkCol > 0) {
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol + sizeof(TSCKSUM)); code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
if (code) goto _err;
} }
SBlockCol blockCol = {.cid = 0}; SBlockCol blockCol = {.cid = 0};
@ -1022,10 +1025,11 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
} }
} else { } else {
// decode from binary // decode from binary
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset; int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset;
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue;
tsdbReadFile(pFD, offset, pReader->aBuf[1], size); code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
if (code) goto _err;
code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]); code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
if (code) goto _err; if (code) goto _err;
@ -1044,7 +1048,7 @@ _err:
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData); code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData);
if (code) goto _err; if (code) goto _err;
if (pDataBlk->nSubBlock > 1) { if (pDataBlk->nSubBlock > 1) {
@ -1062,7 +1066,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData
tBlockDataInitEx(&bData2, pBlockData); tBlockDataInitEx(&bData2, pBlockData);
for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1); code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1);
if (code) { if (code) {
tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1); tBlockDataDestroy(&bData2, 1);

View File

@ -1548,7 +1548,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
if (code) goto _exit; if (code) goto _exit;
blockCol.offset = aBufN[0]; blockCol.offset = aBufN[0];
aBufN[0] = aBufN[0] + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); aBufN[0] = aBufN[0] + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
} }
code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol)); code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
@ -1556,15 +1556,8 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol); hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol);
} }
aBufN[1] = 0; // SBlockCol
if (hdr.szBlkCol > 0) { aBufN[1] = hdr.szBlkCol;
aBufN[1] = hdr.szBlkCol + sizeof(TSCKSUM);
code = tRealloc(&aBuf[1], aBufN[1]);
if (code) goto _exit;
taosCalcChecksumAppend(0, aBuf[1], aBufN[1]);
}
// uid + version + tskey // uid + version + tskey
aBufN[2] = 0; aBufN[2] = 0;
@ -1585,16 +1578,11 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
if (code) goto _exit; if (code) goto _exit;
aBufN[2] += hdr.szKey; aBufN[2] += hdr.szKey;
aBufN[2] += sizeof(TSCKSUM);
code = tRealloc(&aBuf[2], aBufN[2]);
if (code) goto _exit;
// hdr // hdr
aBufN[3] = tPutDiskDataHdr(NULL, &hdr); aBufN[3] = tPutDiskDataHdr(NULL, &hdr);
code = tRealloc(&aBuf[3], aBufN[3]); code = tRealloc(&aBuf[3], aBufN[3]);
if (code) goto _exit; if (code) goto _exit;
tPutDiskDataHdr(aBuf[3], &hdr); tPutDiskDataHdr(aBuf[3], &hdr);
taosCalcChecksumAppend(taosCalcChecksum(0, aBuf[3], aBufN[3]), aBuf[2], aBufN[2]);
// aggragate // aggragate
if (ppOut) { if (ppOut) {
@ -1626,10 +1614,6 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
// SDiskDataHdr // SDiskDataHdr
n += tGetDiskDataHdr(pIn + n, &hdr); n += tGetDiskDataHdr(pIn + n, &hdr);
if (!taosCheckChecksumWhole(pIn, n + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
ASSERT(hdr.delimiter == TSDB_FILE_DLMT); ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
pBlockData->suid = hdr.suid; pBlockData->suid = hdr.suid;
@ -1657,7 +1641,7 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
code = tsdbDecmprData(pIn + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, code = tsdbDecmprData(pIn + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
sizeof(TSKEY) * hdr.nRow, &aBuf[0]); sizeof(TSKEY) * hdr.nRow, &aBuf[0]);
if (code) goto _exit; if (code) goto _exit;
n = n + hdr.szKey + sizeof(TSCKSUM); n += hdr.szKey;
// loop to decode each column data // loop to decode each column data
if (hdr.szBlkCol == 0) goto _exit; if (hdr.szBlkCol == 0) goto _exit;
@ -1679,8 +1663,8 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
if (code) goto _exit; if (code) goto _exit;
} }
} else { } else {
code = tsdbDecmprColData(pIn + n + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &blockCol, hdr.cmprAlg, code = tsdbDecmprColData(pIn + n + hdr.szBlkCol + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData,
hdr.nRow, pColData, &aBuf[0]); &aBuf[0]);
if (code) goto _exit; if (code) goto _exit;
} }
} }
@ -2062,12 +2046,6 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
} }
size += pBlockCol->szValue; size += pBlockCol->szValue;
// checksum
size += sizeof(TSCKSUM);
code = tRealloc(ppOut, nOut + size);
if (code) goto _exit;
taosCalcChecksumAppend(0, *ppOut + nOut, size);
_exit: _exit:
return code; return code;
} }
@ -2076,12 +2054,6 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t **ppBuf) { uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole(pIn, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
ASSERT(pColData->cid == pBlockCol->cid); ASSERT(pColData->cid == pBlockCol->cid);
ASSERT(pColData->type == pBlockCol->type); ASSERT(pColData->type == pBlockCol->type);
pColData->smaOn = pBlockCol->smaOn; pColData->smaOn = pBlockCol->smaOn;