Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Minglei Jin 2022-06-29 16:54:05 +08:00
commit 8218febeae
2 changed files with 46 additions and 24 deletions

View File

@ -405,6 +405,28 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
int8_t toDataOnly) {
int32_t code = 0;
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
pBlock->last = 1;
} else {
pBlock->last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err;
return code;
_err:
return code;
}
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey, static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
int8_t toDataOnly) { int8_t toDataOnly) {
int32_t code = 0; int32_t code = 0;
@ -552,7 +574,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
continue; continue;
_write_block: _write_block:
if (!toDataOnly && pBlockData->nRow < pCommitter->minKey) { if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
pBlock->last = 1; pBlock->last = 1;
} else { } else {
pBlock->last = 0; pBlock->last = 0;

View File

@ -876,37 +876,37 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
goto _err; goto _err;
} }
// check // // check
p = *ppBuf1; // p = *ppBuf1;
SBlockDataHdr *pHdr = (SBlockDataHdr *)p; // SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); // ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
ASSERT(pHdr->suid == pBlockIdx->suid); // ASSERT(pHdr->suid == pBlockIdx->suid);
ASSERT(pHdr->uid == pBlockIdx->uid); // ASSERT(pHdr->uid == pBlockIdx->uid);
p += sizeof(*pHdr); // p += sizeof(*pHdr);
if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) { // if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED; // code = TSDB_CODE_FILE_CORRUPTED;
goto _err; // goto _err;
} // }
p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); // p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { // for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); // tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); // ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) continue; // if (pBlockCol->flag == HAS_NULL) continue;
if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { // if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED; // code = TSDB_CODE_FILE_CORRUPTED;
goto _err; // goto _err;
} // }
p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); // p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
} // }
// recover // recover
pBlockData->nRow = pSubBlock->nRow; pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(*pHdr); p = *ppBuf1 + sizeof(SBlockDataHdr);
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err; if (code) goto _err;