more last file refact

This commit is contained in:
Hongze Cheng 2022-08-03 12:16:57 +00:00
parent e827d0bb7a
commit 7075244183
3 changed files with 240 additions and 252 deletions

View File

@ -469,7 +469,8 @@ struct SColData {
};
struct SBlockData {
int64_t suid;
int64_t suid; // 0 means normal table data block
int64_t uid; // 0 means block data in .last file, others in .data file
int32_t nRow;
int64_t *aUid;
int64_t *aVersion;

View File

@ -290,7 +290,7 @@ _err:
return code;
}
static int32_t tsdbCommitNextLastRow(SCommitter *pCommitter) {
static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pReader);
@ -363,7 +363,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->dReader.bDatal.nRow = 0;
pCommitter->dReader.iRow = -1;
pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo;
code = tsdbCommitNextLastRow(pCommitter);
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
@ -527,128 +527,94 @@ _exit:
return code;
}
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
int8_t toDataOnly) {
static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlockData *pBlockDataMerge = &pCommitter->dReader.bData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block;
SBlock *pBlock = █
TSDBROW *pRow1;
TSDBROW row2;
TSDBROW *pRow2 = &row2;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockDataR = &pCommitter->dReader.bData;
SBlockData *pBlockDataW = &pCommitter->dWriter.bData;
// read SBlockData
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR, NULL, NULL);
if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
tBlockDataClearData(pBlockDataW);
int32_t iRow = 0;
TSDBROW row;
TSDBROW *pRow1 = tsdbTbDataIterGet(pIter);
TSDBROW *pRow2 = &row;
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
while (pRow1 && pRow2) {
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
// loop to merge
pRow1 = tsdbTbDataIterGet(pIter);
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
if (c < 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataClearData(pBlockData);
while (true) {
if (pRow1 == NULL && pRow2 == NULL) {
if (pBlockData->nRow == 0) {
break;
code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema);
if (code) goto _err;
// next
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL);
if (code) goto _err;
iRow++;
if (iRow < pBlockDataR->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
} else {
goto _write_block;
pRow2 = NULL;
}
}
if (pRow1 && pRow2) {
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
goto _append_mem_row;
} else if (c > 0) {
goto _append_block_row;
} else {
ASSERT(0);
}
} else if (pRow1) {
goto _append_mem_row;
} else {
goto _append_block_row;
ASSERT(0);
}
_append_mem_row:
code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
// check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
}
}
while (pRow2) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
} else {
pRow1 = NULL;
}
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block;
} else {
continue;
}
_append_block_row:
code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
if (code) goto _err;
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
iRow++;
if (iRow < pBlockDataR->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
} else {
pRow2 = NULL;
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
goto _write_block;
} else {
continue;
// check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
}
}
_write_block:
code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
// check
if (pBlockDataW->nRow > 0) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataClearData(pBlockData);
}
return code;
_err:
tsdbError("vgId:%d, tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d, tsdb merge commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
int32_t code = 0;
#if 0
TSDBROW *pRow;
SBlock block;
SBlock *pBlock = &block;
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataClearData(pBlockData);
pRow = tsdbTbDataIterGet(pIter);
ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) {
if (pBlockData->nRow > 0) {
@ -659,7 +625,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
}
// update schema
code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
// append
@ -668,27 +634,20 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
// if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
// crash on CI, use the block following
if (pRow) {
TSDBKEY tmpKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) {
TSDBKEY rowKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&rowKey, &toKey) >= 0) {
pRow = NULL;
}
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
continue;
_write_block:
code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
if (code) goto _err;
tBlockReset(pBlock);
tBlockDataClearData(pBlockData);
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
_write_block:
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
}
}
#endif
return code;
_err:
@ -719,26 +678,6 @@ _err:
return code;
}
// static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
// int32_t code = 0;
// SBlockIdx blockIdx = {.suid = suid, .uid = uid};
// SBlockIdx *pBlockIdx = &blockIdx;
// code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx);
// if (code) goto _err;
// if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY;
// goto _err;
// }
// return code;
// _err:
// tsdbError("vgId:%d, commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
// return code;
// }
static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) {
int32_t nRow = 0;
@ -763,42 +702,35 @@ static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) {
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlock block;
TSDBROW *pRow;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
tBlockDataClearData(pBlockData);
while (true) {
if (pRow == NULL) break;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
TSDBKEY key = TSDBROW_KEY(pRow);
int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock);
if (c == 0) {
code =
tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
} else if (c > 0) {
TSDBKEY rowKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) {
pRow = NULL;
} else {
ASSERT(0);
}
}
if (pRow == NULL) {
break;
}
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err;
}
block = *pBlock;
code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
SBlock block = *pBlock;
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &block, NULL, NULL, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock);
if (code) goto _err;
return code;
@ -808,16 +740,101 @@ _err:
return code;
}
static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, int32_t *nRow) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
while (pRow && pRowInfo) {
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL);
if (pCommitter->dReader.pRowInfo) {
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break;
nRow++;
}
}
if (nRow == 0) goto _exit;
SBlockData *pBlockData;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
while (nRow) {
if (nRow < pCommitter->minRow) { // to .last
pBlockData = &pCommitter->dWriter.bDatal;
// check if same schema
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
}
if (pBlockData->nRow + nRow > pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
} else { // to .data
pBlockData = &pCommitter->dWriter.bData;
}
while (pRow && pRowInfo) {
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL);
if (code) goto _err;
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
} else {
ASSERT(0);
}
nRow--;
if (pBlockData->uid) { // .data block
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
goto _outer_break;
}
} else {
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
}
}
while (pRow) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
@ -825,81 +842,54 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, i
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
} else if (c > 0) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL);
nRow--;
if (pBlockData->uid) { // .data block
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
goto _outer_break;
}
} else {
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
}
}
while (pRowInfo) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL);
if (code) goto _err;
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
// todo
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
// todo
} else {
// todo
}
}
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
} else {
ASSERT(0);
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
}
nRow--;
if (pBlockData->uid) { // .data block
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
while (pRow) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
}
while (pRowInfo) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL);
if (code) goto _err;
pCommitter->dReader.iRow++;
if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) {
// todo
} else {
pCommitter->dReader.iBlockL++;
if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) {
// todo
goto _outer_break;
}
} else {
// todo
ASSERT(pBlockData->nRow <= pCommitter->maxRow);
}
}
pRowInfo = pCommitter->dReader.pRowInfo;
if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) {
pRowInfo = NULL;
}
(*nRow)--;
ASSERT(*nRow >= 0);
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data;
_outer_break:
ASSERT(nRow >= 0);
}
SBlock block;
_write_block_data:
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
@ -909,14 +899,6 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0);
// end last if need
if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) {
if (pCommitter->dWriter.bDatal.nRow > 0) {
// TODO: code = tsdbCommitBlockDataL(pCommitter);
if (code) goto _err;
}
}
// merge commit table data
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
@ -930,22 +912,22 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
if (pRow == NULL) goto _exit;
SBlockIdx *pBlockIdx = NULL;
int32_t iBlock = 0;
SBlock block;
SBlock *pBlock = &block;
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
pBlockIdx = pCommitter->dReader.pBlockIdx;
}
if (pBlockIdx && iBlock < pCommitter->dReader.mBlock.nItem) {
int32_t iBlock = 0;
SBlock block;
SBlock *pBlock = &block;
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0 &&
iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
if (code) goto _err;
tMapDataReset(&pCommitter->dWriter.mBlock);
code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema);
if (code) goto _err;
while (pBlock && pRow) {
int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)});
if (c < 0) { // disk
@ -959,8 +941,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
} else {
pBlock = NULL;
}
} else if (c < 0) { // memory
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
} else if (c > 0) { // memory
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey);
if (code) goto _err;
// next
@ -977,7 +959,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
} else {
// code = tsdbMergeTableData(pCommitter, pIter, pBlock, NULL, 1);
code = tsdbMergeCommitData(pCommitter, pIter, pBlock);
if (code) goto _err;
}
@ -999,6 +981,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
// next
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
@ -1008,8 +991,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
}
// merge with last
int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
if (pCommitter->dReader.pRowInfo) {
code = tsdbMergeCommitLast(pCommitter, pIter);
if (code) goto _err;
#if 0
int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
VERSION_MIN}); if (pCommitter->dReader.pRowInfo) {
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
int64_t uid = pCommitter->dReader.bDatal.aUid[iRow];
if (uid == pTbData->uid) {
@ -1022,6 +1008,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft);
if (code) goto _err;
}
#endif
// end
if (pCommitter->dWriter.mBlock.nItem > 0) {
@ -1051,14 +1038,14 @@ _err:
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write aBlockL
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL);
if (code) goto _err;
// write aBlockIdx
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
if (code) goto _err;
// write aBlockL
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL);
if (code) goto _err;
// update file header
code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
if (code) goto _err;
@ -1157,7 +1144,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL);
if (code) goto _err;
code = tsdbCommitNextLastRow(pCommitter);
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
nRow--;

View File

@ -537,7 +537,7 @@ _err:
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->offset;
int64_t size = pReader->pSet->pHeadF->size - offset;
int64_t size = pReader->pSet->pHeadF->loffset - offset;
uint8_t *pBuf = NULL;
int64_t n;
uint32_t delimiter;
@ -604,7 +604,7 @@ _err:
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->loffset;
int64_t size = pReader->pSet->pHeadF->offset - offset;
int64_t size = pReader->pSet->pHeadF->size - offset;
int64_t n;
uint32_t delimiter;
uint8_t *pBuf = NULL;