more code

This commit is contained in:
Hongze Cheng 2022-12-28 14:21:06 +08:00
parent a0fa31954c
commit e0e7cc4f87
1 changed files with 65 additions and 9 deletions

View File

@ -53,19 +53,27 @@ typedef struct STsdbDataIter {
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n))) #define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
int64_t cid; int64_t cid;
int32_t maxRows; int32_t maxRows;
int32_t minRows; int32_t minRows;
STsdbFS fs; STsdbFS fs;
int32_t fid; int32_t fid;
SDFileSet *pDFileSet; SDFileSet *pDFileSet;
// Reader
SDataFReader *pReader; SDataFReader *pReader;
STsdbDataIter *iterList; // list of iterators STsdbDataIter *iterList; // list of iterators
SRBTree rtree; SRBTree rtree;
STsdbDataIter *pIter; STsdbDataIter *pIter;
SBlockData bData; SBlockData bData;
SSkmInfo tbSkm; SSkmInfo tbSkm;
// Writer
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; // SMapData<SDataBlk>
SArray *aSttBlk; // SArray<SSttBlk>
} STsdbCompactor; } STsdbCompactor;
#define TSDB_FLG_DEEP_COMPACT 0x1 #define TSDB_FLG_DEEP_COMPACT 0x1
@ -141,7 +149,7 @@ _exit:
_clear_exit: _clear_exit:
*ppIter = NULL; *ppIter = NULL;
if (pIter) { if (pIter) {
tBlockDataDestroy(&pDataDIter->bData, 1); tBlockDataDestroy(&pDataDIter->bData);
tMapDataClear(&pDataDIter->mDataBlk); tMapDataClear(&pDataDIter->mDataBlk);
taosArrayDestroy(pDataDIter->aBlockIdx); taosArrayDestroy(pDataDIter->aBlockIdx);
taosMemoryFree(pIter); taosMemoryFree(pIter);
@ -191,7 +199,7 @@ _exit:
_clear_exit: _clear_exit:
*ppIter = NULL; *ppIter = NULL;
if (pIter) { if (pIter) {
tBlockDataDestroy(&pSttDIter->bData, 1); tBlockDataDestroy(&pSttDIter->bData);
taosArrayDestroy(pSttDIter->aSttBlk); taosArrayDestroy(pSttDIter->aSttBlk);
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
@ -559,6 +567,33 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
pCompactor->pIter = NULL; pCompactor->pIter = NULL;
tBlockDataReset(&pCompactor->bData); tBlockDataReset(&pCompactor->bData);
// open writers
SDFileSet fSet = {0}; // TODO
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->aBlockIdx == NULL) {
pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCompactor->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
taosArrayClear(pCompactor->aBlockIdx);
}
tMapDataReset(&pCompactor->mDataBlk);
if (pCompactor->aSttBlk == NULL) {
pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pCompactor->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
taosArrayClear(pCompactor->aSttBlk);
}
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -666,6 +701,27 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (pCompactor->bData.nRow > 0) {
// write again
}
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->mDataBlk.nItem > 0) {
SBlockIdx *pBlockIdx = taosArrayReserve(pCompactor->aBlockIdx, 1);
if (pBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseCompactor(pCompactor); tsdbCloseCompactor(pCompactor);
} }