more last file refact

This commit is contained in:
Hongze Cheng 2022-08-03 08:50:36 +00:00
parent e996bb4542
commit e827d0bb7a
4 changed files with 289 additions and 260 deletions

View File

@ -233,6 +233,12 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
/* new */
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1,
uint8_t **ppBuf2, int8_t cmprAlg);
int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1,
uint8_t **ppBuf2, int8_t cmprAlg);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
@ -244,6 +250,12 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf);
/* new */
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2);
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
@ -457,6 +469,7 @@ struct SColData {
};
struct SBlockData {
int64_t suid;
int32_t nRow;
int64_t *aUid;
int64_t *aVersion;

View File

@ -56,8 +56,8 @@ typedef struct {
SBlockL *pBlockL;
SBlockData bDatal;
int32_t iRow;
SRowInfo *pRow;
SRowInfo row;
SRowInfo *pRowInfo;
SRowInfo rowInfo;
} dReader;
struct {
SDataFWriter *pWriter;
@ -290,12 +290,46 @@ _err:
return code;
}
static int32_t tsdbCommitNextLastRow(SCommitter *pCommitter) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pReader);
ASSERT(pCommitter->dReader.pRowInfo);
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, &pCommitter->dReader.bDatal,
NULL, NULL);
if (code) goto _exit;
pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRowInfo->suid = pCommitter->dReader.pBlockL->suid;
pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.pRowInfo = NULL;
}
}
_exit:
return code;
}
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL;
// memory
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
pCommitter->nextKey = TSKEY_MAX;
// Reader
@ -325,25 +359,15 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
if (code) goto _err;
pCommitter->dReader.iBlockL = 0;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
// TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL,
// NULL);
if (code) goto _err;
pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRow = &pCommitter->dReader.row;
pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid;
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.pRow = NULL;
}
pCommitter->dReader.iBlockL = -1;
pCommitter->dReader.bDatal.nRow = 0;
pCommitter->dReader.iRow = -1;
pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo;
code = tsdbCommitNextLastRow(pCommitter);
if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
pCommitter->dReader.pRow = NULL;
pCommitter->dReader.pRowInfo = NULL;
}
// Writer
@ -463,6 +487,46 @@ _err:
return code;
}
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
int32_t code = 0;
SBlock block;
ASSERT(pCommitter->dWriter.bData.nRow > 0);
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL,
pCommitter->cmprAlg);
if (code) goto _exit;
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock);
if (code) goto _exit;
tBlockDataClearData(&pCommitter->dWriter.bData);
_exit:
return code;
}
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
int32_t code = 0;
SBlockL blockL;
ASSERT(pCommitter->dWriter.bDatal.nRow > 0);
code = tsdbWriteLastBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, &blockL, NULL, NULL,
pCommitter->cmprAlg);
if (code) goto _exit;
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
tBlockDataClearData(&pCommitter->dWriter.bDatal);
_exit:
return code;
}
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
int8_t toDataOnly) {
int32_t code = 0;
@ -675,22 +739,16 @@ _err:
// return code;
// }
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
int32_t nRow = 0;
TSDBROW *pRow;
TSDBKEY key;
int32_t c = 0;
static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) {
int32_t nRow = 0;
STbDataIter iter = *pIter;
iter.pRow = NULL;
while (true) {
pRow = tsdbTbDataIterGet(&iter);
TSDBROW *pRow = tsdbTbDataIterGet(&iter);
if (pRow == NULL) break;
key = TSDBROW_KEY(pRow);
c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
if (c == 0) {
int32_t c = tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &key);
if (c < 0) {
nRow++;
tsdbTbDataIterNext(&iter);
} else if (c > 0) {
@ -750,11 +808,106 @@ _err:
return code;
}
static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, int32_t *nRow) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
while (pRow && pRowInfo) {
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL);
if (code) goto _err;
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
// todo
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
// todo
} else {
// todo
}
}
pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
} else {
ASSERT(0);
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
}
while (pRow) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
}
while (pRowInfo) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL);
if (code) goto _err;
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
// todo
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
// todo
} else {
// todo
}
}
pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
}
SBlock block;
_write_block_data:
return code;
_err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
ASSERT(pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, pTbData) >= 0);
ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0);
// end last if need
if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) {
@ -816,7 +969,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
pRow = NULL;
}
} else { // merge
int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey);
ASSERT(nOvlp > 0);
@ -854,13 +1007,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
}
}
if (pRow) {
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
// merge with last
int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
if (pCommitter->dReader.pRowInfo) {
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
int64_t uid = pCommitter->dReader.bDatal.aUid[iRow];
if (uid == pTbData->uid) {
nRowLeft++;
}
}
}
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey);
while (nRowLeft) {
code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft);
if (code) goto _err;
}
// end
@ -888,174 +1048,6 @@ _err:
return code;
}
#if 0
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
TSDBROW *pRow;
int32_t iBlock;
int32_t nBlock;
int64_t suid;
int64_t uid;
SBlockIdx *pBlockIdx = NULL;
if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
suid = pTbData->suid;
uid = pTbData->uid;
} else {
pIter = NULL;
pRow = NULL;
}
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err;
nBlock = pCommitter->dReader.mBlock.nItem;
ASSERT(nBlock > 0);
suid = pBlockIdx->suid;
uid = pBlockIdx->uid;
} else {
nBlock = 0;
}
if (pRow == NULL && nBlock == 0) goto _exit;
// start ===========
tMapDataReset(&pCommitter->dWriter.mBlock);
SBlock block;
SBlock *pBlock = &block;
iBlock = 0;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
if (pRow) {
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
if (code) goto _err;
}
// merge ===========
while (true) {
if (pRow == NULL && pBlock == NULL) break;
if (pRow && pBlock) {
if (pBlock->last) {
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
ASSERT(pRow == NULL && pBlock == NULL);
} else {
int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
if (c > 0) {
// only disk data
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else if (c < 0) {
// only memory data
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
} else {
// merge memory and disk
int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
ASSERT(nOvlp);
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
} else {
TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
int8_t toDataOnly = 0;
if (iBlock < nBlock - 1) {
toDataOnly = 1;
SBlock nextBlock = {0};
tBlockReset(&nextBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock);
toKey = nextBlock.minKey;
}
code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
if (code) goto _err;
}
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
}
} else if (pBlock) {
code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
if (code) goto _err;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else {
code =
tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
ASSERT(pRow == NULL);
}
}
// end =====================
code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
if (code) goto _err;
_exit:
if (pIter) {
pRow = tsdbTbDataIterGet(pIter);
if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
}
return code;
_err:
tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
#endif
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
@ -1122,52 +1114,53 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
}
// last
SBlockL blockL;
while (true) {
if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break;
if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break;
// check if same suid
if (pCommitter->dReader.pRow->suid == 0) {
if (pCommitter->dReader.pRow->uid != 0 /*todo*/) {
// code = tsdbCommitBlockDataL(pCommitter);
// commit if not same schema
if (pCommitter->dWriter.bDatal.nRow > 0) {
if (pCommitter->dWriter.bDatal.suid != pCommitter->dReader.pRowInfo->suid ||
pCommitter->dWriter.bDatal.suid == 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
} else if (pCommitter->dReader.pRow->suid != 0 /*todo*/) {
// code = tsdbCommitBlockDataL(pCommitter);
if (code) goto _err;
}
// append
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRow->row, NULL);
if (code) goto _err;
// next
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL);
// TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL,
// NULL);
if (code) goto _err;
pCommitter->dReader.iRow = 0;
pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid;
pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow];
pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow);
} else {
pCommitter->dReader.pRow = NULL;
}
}
// write
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
// code = tsdbCommitBlockDataL(pCommitter);
if (pCommitter->dWriter.bDatal.nRow == 0) {
code = tsdbCommitterUpdateTableSchema(pCommitter, pCommitter->dReader.pRowInfo->suid,
pCommitter->dReader.pRowInfo->suid, 1 /*TODO*/);
if (code) goto _err;
pCommitter->dWriter.bDatal.suid = pCommitter->dReader.pRowInfo->suid;
code = tBlockDataSetSchema(&pCommitter->dWriter.bDatal, pCommitter->skmTable.pTSchema);
if (code) goto _err;
}
// check if it can make sure that one table data in one block
int64_t uid = pCommitter->dReader.pRowInfo->uid;
int32_t nRow = 0;
for (int32_t iRow = pCommitter->dReader.iRow;
(iRow < pCommitter->dReader.bDatal.nRow) && (pCommitter->dReader.bDatal.aUid[iRow] == uid); iRow++) {
nRow++;
}
ASSERT(nRow > 0 && nRow < pCommitter->minRow);
if (pCommitter->dWriter.bDatal.nRow + nRow > pCommitter->maxRow) {
ASSERT(pCommitter->dWriter.bDatal.nRow > 0);
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
while (nRow > 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL);
if (code) goto _err;
code = tsdbCommitNextLastRow(pCommitter);
if (code) goto _err;
nRow--;
}
}
@ -1192,7 +1185,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
// move commit until current (suid, uid)
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid});
code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData);
if (code) goto _err;
// commit current table data
@ -1204,7 +1197,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) {
// code = tsdbCommitBlockDataL(pCommitter);
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
@ -1261,15 +1254,15 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
goto _exit;
}
code = tBlockDataInit(&pCommitter->dReader.bData);
if (code) goto _exit;
pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pCommitter->dReader.aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataInit(&pCommitter->dReader.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->dReader.bDatal);
if (code) goto _exit;
@ -1299,9 +1292,9 @@ _exit:
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
// Reader
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
taosArrayDestroy(pCommitter->dReader.aBlockL);
tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataClear(&pCommitter->dReader.bData, 1);
taosArrayDestroy(pCommitter->dReader.aBlockL);
tBlockDataClear(&pCommitter->dReader.bDatal, 1);
// Writer
@ -1329,9 +1322,6 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
// impl ====================
pCommitter->nextKey = pMemTable->minKey;
while (pCommitter->nextKey < TSKEY_MAX) {
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
code = tsdbCommitFileData(pCommitter);
if (code) goto _err;
}

View File

@ -543,9 +543,13 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
uint32_t delimiter;
SBlockIdx blockIdx;
if (!ppBuf) ppBuf = &pBuf;
taosArrayClear(aBlockIdx);
if (size == 0) {
goto _exit;
}
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
if (code) goto _err;
@ -576,7 +580,6 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
n = tGetU32(*ppBuf + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
taosArrayClear(aBlockIdx);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockIdx(*ppBuf + n, &blockIdx);
@ -588,6 +591,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
tFree(pBuf);
return code;
@ -606,9 +610,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
uint8_t *pBuf = NULL;
SBlockL blockl;
if (!ppBuf) ppBuf = &pBuf;
taosArrayClear(aBlockL);
if (size == 0) {
goto _exit;
}
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
if (code) goto _err;
@ -639,7 +647,6 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
n = tGetU32(*ppBuf + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
taosArrayClear(aBlockL);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockL(*ppBuf + n, &blockl);
@ -651,11 +658,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
ASSERT(n + sizeof(TSCKSUM) == size);
_exit:
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
@ -1562,7 +1571,11 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
int64_t size;
int64_t n;
if (!ppBuf) ppBuf = &pBuf;
// check
if (taosArrayGetSize(aBlockIdx) == 0) {
pHeadFile->offset = pHeadFile->size;
goto _exit;
}
// prepare
size = tPutU32(NULL, TSDB_FILE_DLMT);
@ -1572,6 +1585,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
size += sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
if (code) goto _err;
@ -1596,6 +1610,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
pHeadFile->offset = pHeadFile->size;
pHeadFile->size += size;
_exit:
tFree(pBuf);
return code;
@ -1662,6 +1677,12 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf)
int64_t size;
int64_t n;
// check
if (taosArrayGetSize(aBlockL) == 0) {
pHeadFile->loffset = pHeadFile->size;
goto _exit;
}
// size
size = sizeof(uint32_t);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) {
@ -1695,10 +1716,13 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf)
pHeadFile->loffset = pHeadFile->size;
pHeadFile->size += size;
_exit:
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}

View File

@ -1122,6 +1122,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData) {
int32_t code = 0;
pBlockData->nRow = 0;
pBlockData->aUid = NULL;
pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aIdx = taosArrayInit(0, sizeof(int32_t));
@ -1146,6 +1147,7 @@ void tBlockDataReset(SBlockData *pBlockData) {
}
void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aUid);
tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx);