more code

This commit is contained in:
Hongze Cheng 2023-02-08 16:11:47 +08:00
parent b5a9cbf71f
commit c398f9d299
1 changed files with 45 additions and 29 deletions

View File

@ -145,7 +145,7 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI
}
}
// reader and write (TODO)
// writer
code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm);
TSDB_CHECK_CODE(code, lino, _exit);
@ -154,13 +154,14 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI
code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
if (!TABLE_SAME_SCHEMA(pCompactor->tbid.suid, pCompactor->tbid.uid, pId->suid, pId->uid)) {
if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) {
if (pCompactor->sData.nRow > 0) {
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataInit(&pCompactor->sData, pId /* TODO */, pCompactor->tbSkm.pTSchema, NULL, 0);
TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid};
code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -191,7 +192,6 @@ static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) {
TSDB_CHECK_CODE(code, lino, _exit);
}
}
tBlockDataClear(&pCompactor->bData);
} else {
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
@ -199,7 +199,7 @@ static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) {
}
}
if (pCompactor->mDataBlk.nItem) {
if (pCompactor->mDataBlk.nItem > 0) {
SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
if (pBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -268,21 +268,21 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p
int32_t code = 0;
int32_t lino = 0;
SRowInfo rInfo;
if (pRowInfo == NULL) {
rInfo.suid = INT64_MAX;
rInfo.uid = INT64_MAX;
// rInfo.row = TSDBORW_V;
pRowInfo = &rInfo;
}
// start a new table data write if need
if (pRowInfo->uid != pCompactor->tbid.uid) {
if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) {
if (pCompactor->tbid.uid) {
code = tsdbCompactWriteTableDataEnd(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pRowInfo == NULL) {
if (pCompactor->sData.nRow > 0) {
code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
return code;
}
code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -290,14 +290,14 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p
// check if row is deleted
if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->bData.nRow >= pCompactor->maxRows) {
if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
@ -367,7 +367,6 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS
/* tombstone */
pCompactor->iDelIdx = 0;
pCompactor->iSkyLine = 0;
/* reader */
code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet);
@ -376,9 +375,16 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS
code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn);
if (pCompactor->pIter) {
pCompactor->pIter->next = pCompactor->iterList;
pCompactor->iterList = pCompactor->pIter;
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
@ -388,11 +394,15 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS
if (pCompactor->pIter) {
pCompactor->pIter->next = pCompactor->iterList;
pCompactor->iterList = pCompactor->pIter;
}
}
code = tsdbDataIterNext2(pCompactor->pIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
}
}
pCompactor->pIter = NULL;
tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn);
/* writer */
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb,
@ -438,7 +448,8 @@ static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
/* finish remaining data (TODO) */
ASSERT(pCompactor->bData.nRow == 0);
ASSERT(pCompactor->sData.nRow == 0);
/* update files */
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
@ -488,28 +499,33 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) {
// do compact
SRowInfo *pRowInfo;
for (;;) {
do {
code = tsdbCompactNextRow(pCompactor, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteTableData(pCompactor, pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
if (pRowInfo == NULL) break;
}
} while (pRowInfo);
// end compact
code = tsdbCompactFileSetEnd(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCompactor->fid);
if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0);
while ((pCompactor->pIter = pCompactor->iterList)) {
pCompactor->iterList = pCompactor->pIter->next;
tsdbCloseDataIter2(pCompactor->pIter);
}
if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader);
}
return code;
}
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
// writer
tBlockDataDestroy(&pCompactor->sData);
tBlockDataDestroy(&pCompactor->bData);