more code

This commit is contained in:
Hongze Cheng 2022-12-29 09:59:28 +08:00
parent 6f30871fd6
commit de82f3a806
1 changed files with 31 additions and 23 deletions

View File

@ -223,7 +223,6 @@ static void tsdbDataIterClose(STsdbDataIter *pIter) {
tBlockDataDestroy(&pSttDIter->bData);
taosArrayDestroy(pSttDIter->aSttBlk);
tsdbDataFReaderClose(&pSttDIter->pReader);
} else {
ASSERT(0);
}
@ -322,13 +321,34 @@ _exit:
return code;
}
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
tsdbFSDestroy(&pCompactor->fs);
tBlockDataDestroy(&pCompactor->bData);
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
pCompactor->tbSkm.pTSchema = NULL;
taosArrayDestroy(pCompactor->aBlockIdx);
tMapDataClear(&pCompactor->mDataBlk);
taosArrayDestroy(pCompactor->aSttBlk);
}
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCompactor->pTsdb;
// TODO
code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
@ -691,8 +711,6 @@ static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) {
TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pCompactor->mDataBlk);
pCompactor->tableId.suid = 0;
pCompactor->tableId.uid = 0;
_exit:
if (code) {
@ -737,7 +755,11 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (pCompactor->bData.suid != pRowInfo->suid) { // different super table
if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table
(pCompactor->tableId.uid != pRowInfo->uid &&
(pRowInfo->suid == 0 ||
pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table
) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
@ -747,24 +769,6 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (pCompactor->bData.uid != pRowInfo->uid) { // different table
if (pRowInfo->suid) {
if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
// different normal table
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid},
pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
@ -772,6 +776,9 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
// check if block data is full
if (pCompactor->bData.nRow >= pCompactor->maxRows) {
code = tsdbCompactWriteBlockData(pCompactor);
@ -814,5 +821,6 @@ _exit:
} else {
tsdbCommitCompact(pCompactor);
}
tsdbEndCompact(pCompactor);
return code;
}