mroe code
This commit is contained in:
parent
53226eae7c
commit
5abaf4409d
|
@ -50,6 +50,8 @@ typedef struct STsdbDataIter {
|
|||
char handle[];
|
||||
} STsdbDataIter;
|
||||
|
||||
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))
|
||||
|
||||
typedef struct {
|
||||
STsdb *pTsdb;
|
||||
STsdbFS fs;
|
||||
|
@ -59,6 +61,7 @@ typedef struct {
|
|||
SDataFReader *pReader;
|
||||
STsdbDataIter *iterList; // list of iterators
|
||||
SRBTree rtree;
|
||||
STsdbDataIter *pIter;
|
||||
SBlockData bData;
|
||||
} STsdbCompactor;
|
||||
|
||||
|
@ -215,18 +218,22 @@ static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
|
|||
|
||||
pSttDIter->iRow++;
|
||||
if (pSttDIter->iRow < pSttDIter->bData.nRow) {
|
||||
ASSERT(0);
|
||||
pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
|
||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
|
||||
} else {
|
||||
pSttDIter->iSttBlk++;
|
||||
if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
|
||||
code = tsdbReadSttBlock(pSttDIter->pReader, pSttDIter->iStt,
|
||||
taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
|
||||
code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt,
|
||||
taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pSttDIter->iRow = 0;
|
||||
pIter->rowInfo.suid = pSttDIter->bData.suid;
|
||||
pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
|
||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
|
||||
} else {
|
||||
// code = TSDB_CODE_TDB_NO_DATA;
|
||||
// goto _exit;
|
||||
pIter->rowInfo.suid = 0;
|
||||
pIter->rowInfo.uid = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -315,10 +322,59 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) {
|
||||
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
// TODO
|
||||
|
||||
if (pCompactor->pIter) {
|
||||
code = tsdbDataIterNext(pCompactor->pIter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
|
||||
pCompactor->pIter = NULL;
|
||||
} else {
|
||||
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
|
||||
if (pNode) {
|
||||
STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
|
||||
|
||||
int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
|
||||
ASSERT(c);
|
||||
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
|
||||
pCompactor->pIter = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pCompactor->pIter == NULL) {
|
||||
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
|
||||
if (pNode) {
|
||||
pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
|
||||
tRBTreeDrop(&pCompactor->rtree, pNode);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pCompactor->pIter == NULL) {
|
||||
code = tsdbCompactNextRow(pCompactor);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (pCompactor->pIter) {
|
||||
*ppRow = &pCompactor->pIter->rowInfo.row;
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
@ -364,6 +420,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
|
|||
tRBTreePut(&pCompactor->rtree, &pIter->n);
|
||||
}
|
||||
}
|
||||
pCompactor->pIter = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -412,13 +469,16 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
|
|||
TSDBROW *pRow = NULL;
|
||||
int64_t nRow = 0;
|
||||
for (;;) {
|
||||
code = tsdbCompactNextRow(&compactor, &pRow);
|
||||
code = tsdbCompactGetRow(&compactor, &pRow);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pRow == NULL) break;
|
||||
|
||||
nRow++;
|
||||
|
||||
code = tsdbCompactNextRow(&compactor);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0);
|
||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
|
|
Loading…
Reference in New Issue