more work

This commit is contained in:
Hongze Cheng 2022-08-08 06:43:51 +00:00
parent 8ae2ab1c6e
commit 6baae7d50f
3 changed files with 57 additions and 32 deletions

View File

@ -191,7 +191,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck); int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);

View File

@ -727,27 +727,24 @@ _err:
return code; return code;
} }
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, int16_t *aColId, static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast,
int32_t nColId, SBlockData *pBlockData) { SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
ASSERT(pBlockData->suid || pBlockData->uid);
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD;
// uid + version + tskey // uid + version + tskey
code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey, 1); code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey, 1);
if (code) goto _err; if (code) goto _err;
SDiskDataHdr hdr; SDiskDataHdr hdr;
uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr); uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT); ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(hdr.suid || hdr.uid); ASSERT(pBlockData->suid == hdr.suid);
ASSERT(pBlockData->uid == hdr.uid);
pBlockData->suid = hdr.suid;
pBlockData->uid = hdr.uid;
pBlockData->nRow = hdr.nRow; pBlockData->nRow = hdr.nRow;
// uid // uid
@ -776,36 +773,61 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
ASSERT(p - pReader->pBuf1 == sizeof(TSCKSUM)); ASSERT(p - pReader->pBuf1 == sizeof(TSCKSUM));
// read and decode columns // read and decode columns
if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
if (hdr.szBlkCol > 0) { if (hdr.szBlkCol > 0) {
code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
hdr.szBlkCol + sizeof(TSCKSUM), 1); code = tsdbReadAndCheck(pFD, offset, &pReader->pBuf1, hdr.szBlkCol + sizeof(TSCKSUM), 1);
if (code) goto _err; if (code) goto _err;
}
SBlockCol blockCol = {.cid = 0};
SBlockCol *pBlockCol = &blockCol;
int32_t n = 0; int32_t n = 0;
while (n < hdr.szBlkCol) {
SBlockCol blockCol;
n += tGetBlockCol(pReader->pBuf1 + n, &blockCol); for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
ASSERT(blockCol.flag && blockCol.flag != HAS_NONE); while (pBlockCol && pBlockCol->cid < pColData->cid) {
if (n < hdr.szBlkCol) {
// TODO: merge with the column IDs n += tGetBlockCol(pReader->pBuf1 + n, pBlockCol);
SColData *pColData = NULL; // (todo)
if (blockCol.flag == HAS_NULL) {
// TODO: make a hdr.nRow COL_VAL_NULL();
} else { } else {
code = tsdbReadAndCheckFile( ASSERT(n == hdr.szBlkCol);
pFD, pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &pReader->pBuf2, pBlockCol = NULL;
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM), 1); }
}
code = tsdbDecmprColData(pReader->pBuf2, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3); if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
// add a lot of NONE
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
ASSERT(pBlockCol->type == pColData->type);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) {
// add a lot of NULL
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
// decode from binary
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + pBlockCol->offset;
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
code = tsdbReadAndCheck(pFD, offset, &pReader->pBuf2, size, 0);
if (code) goto _err;
code = tsdbDecmprColData(pReader->pBuf2, pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3);
if (code) goto _err; if (code) goto _err;
} }
} }
} }
_exit:
return code; return code;
_err: _err:
@ -817,7 +839,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl
int32_t nColId) { int32_t nColId) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, aColId, nColId, pBlockData); code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData);
if (code) goto _err; if (code) goto _err;
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
@ -836,7 +858,7 @@ int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *p
int32_t nColId) { int32_t nColId) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, aColId, nColId, pBlockData); code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData);
if (code) goto _err; if (code) goto _err;
return code; return code;

View File

@ -1172,6 +1172,7 @@ _exit:
int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) { int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) {
int32_t code = 0; int32_t code = 0;
ASSERT(0);
ASSERT(suid || uid); ASSERT(suid || uid);
pBlockData->suid = suid; pBlockData->suid = suid;
@ -1208,6 +1209,8 @@ void tBlockDataReset(SBlockData *pBlockData) {
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData) {
ASSERT(pBlockData->suid || pBlockData->uid);
pBlockData->nRow = 0; pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
@ -1864,8 +1867,8 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
goto _exit; goto _exit;
} }
pColData->cid = pBlockCol->cid; ASSERT(pColData->cid == pBlockCol->cid);
pColData->type = pBlockCol->type; ASSERT(pColData->type == pBlockCol->type);
pColData->smaOn = pBlockCol->smaOn; pColData->smaOn = pBlockCol->smaOn;
pColData->flag = pBlockCol->flag; pColData->flag = pBlockCol->flag;
pColData->nVal = nVal; pColData->nVal = nVal;
@ -1899,7 +1902,7 @@ _exit:
return code; return code;
} }
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) { int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
int32_t code = 0; int32_t code = 0;
// alloc // alloc