refact code

This commit is contained in:
Hongze Cheng 2022-08-28 16:30:37 +08:00
parent 2d3762a546
commit 1852c21b40
1 changed files with 0 additions and 417 deletions

View File

@ -654,394 +654,6 @@ _err:
return code;
}
#if 0
static int32_t tsdbMergeCommitDataBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockDataR = &pCommitter->dReader.bData;
SBlockData *pBlockDataW = &pCommitter->dWriter.bData;
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR);
if (code) goto _err;
tBlockDataClear(pBlockDataW);
int32_t iRow = 0;
TSDBROW row;
TSDBROW *pRow1 = tsdbTbDataIterGet(pIter);
TSDBROW *pRow2 = &row;
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
while (pRow1 && pRow2) {
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
if (c < 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err;
// next
tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter);
} else if (c > 0) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid);
if (code) goto _err;
iRow++;
if (iRow < pBlockDataR->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
} else {
pRow2 = NULL;
}
} else {
ASSERT(0);
}
// check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
while (pRow2) {
code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid);
if (code) goto _err;
iRow++;
if (iRow < pBlockDataR->nRow) {
*pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow);
} else {
pRow2 = NULL;
}
// check
if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
// check
if (pBlockDataW->nRow > 0) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb merge commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClear(pBlockData);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) {
if (pBlockData->nRow > 0) {
goto _write_block;
} else {
break;
}
}
// update schema
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
// append
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
TSDBKEY rowKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&rowKey, &toKey) >= 0) {
pRow = NULL;
}
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
_write_block:
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
return code;
_err:
tsdbError("vgId:%d, tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) {
int32_t nRow = 0;
STbDataIter iter = *pIter;
while (true) {
TSDBROW *pRow = tsdbTbDataIterGet(&iter);
if (pRow == NULL) break;
int32_t c = tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &key);
if (c < 0) {
nRow++;
tsdbTbDataIterNext(&iter);
} else if (c > 0) {
break;
} else {
ASSERT(0);
}
}
return nRow;
}
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClear(pBlockData);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) break;
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow) {
TSDBKEY rowKey = TSDBROW_KEY(pRow);
if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) {
pRow = NULL;
}
}
}
ASSERT(pBlockData->nRow > 0 && pBlock->nRow + pBlockData->nRow <= pCommitter->maxRow);
code = tsdbCommitDataBlock(pCommitter, pBlock);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d, tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitLastFile(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRow == NULL) goto _exit;
if (pBlockData->suid || pBlockData->uid) {
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
if (pBlockData->nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
tBlockDataReset(pBlockData);
}
}
if (!pBlockData->suid && !pBlockData->uid) {
code = tBlockDataInit(pBlockData, pTbData->suid, 0, pCommitter->skmTable.pTSchema);
if (code) goto _err;
}
while (pRow) {
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
if (code) goto _err;
tsdbTbDataIterNext(pIter);
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pBlockData->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) {
int32_t code = 0;
STbData *pTbData = pIter->pTbData;
int32_t iBlock = 0;
SBlock block;
SBlock *pBlock = &block;
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
while (pBlock && pRow) {
SBlock tBlock = {.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)};
int32_t c = tBlockCmprFn(pBlock, &tBlock);
if (c < 0) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
} else if (c > 0) {
code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey);
if (code) goto _err;
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
} else {
int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey);
ASSERT(nOvlp > 0);
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
} else {
code = tsdbMergeCommitDataBlock(pCommitter, pIter, pBlock);
if (code) goto _err;
}
// next
pRow = tsdbTbDataIterGet(pIter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
}
while (pBlock) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
int32_t code = 0;
ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0);
// merge commit table data
STbDataIter iter = {0};
TSDBROW *pRow;
tMapDataReset(&pCommitter->dWriter.mBlock);
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter);
pRow = tsdbTbDataIterGet(&iter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pRow = NULL;
}
if (pRow == NULL) {
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) {
code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock);
if (code) goto _err;
}
goto _exit;
}
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
if (code) goto _err;
// commit data
code = tsdbMergeCommitData(pCommitter, &iter);
if (code) goto _err;
// commit last
code = tsdbCommitLastFile(pCommitter, &iter);
if (code) goto _err;
_exit:
if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
pRow = tsdbTbDataIterGet(&iter);
if (pRow) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
}
return code;
_err:
tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
#endif
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
@ -1112,38 +724,9 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
code = tsdbCommitFileDataStart(pCommitter);
if (code) goto _err;
#if 1
// impl
code = tsdbCommitFileDataImpl(pCommitter);
if (code) goto _err;
#else
// commit file data impl
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
// move commit until current (suid, uid)
code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData);
if (code) goto _err;
// commit current table data
code = tsdbCommitTableData(pCommitter, pTbData);
if (code) goto _err;
// move next reader table data if need
if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) {
code = tsdbCommitterNextTableData(pCommitter);
if (code) goto _err;
}
}
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX});
if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
#endif
// commit file data end
code = tsdbCommitFileDataEnd(pCommitter);