refact code
This commit is contained in:
parent
4813af0867
commit
5405a967fb
|
@ -55,12 +55,13 @@ int32_t tsdbIterClose(STsdbIter **iter);
|
||||||
int32_t tsdbIterNext(STsdbIter *iter);
|
int32_t tsdbIterNext(STsdbIter *iter);
|
||||||
|
|
||||||
// SIterMerger ===============
|
// SIterMerger ===============
|
||||||
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb);
|
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb);
|
||||||
int32_t tsdbIterMergerClose(SIterMerger **merger);
|
int32_t tsdbIterMergerClose(SIterMerger **merger);
|
||||||
int32_t tsdbIterMergerNext(SIterMerger *merger);
|
int32_t tsdbIterMergerNext(SIterMerger *merger);
|
||||||
SRowInfo *tsdbIterMergerGet(SIterMerger *merger);
|
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
|
||||||
|
|
||||||
|
SRowInfo *tsdbIterMergerGetData(SIterMerger *merger);
|
||||||
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger);
|
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger);
|
||||||
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,7 +178,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// loop iter
|
// loop iter
|
||||||
while ((row = tsdbIterMergerGet(committer->iterMerger)) != NULL) {
|
while ((row = tsdbIterMergerGetData(committer->iterMerger)) != NULL) {
|
||||||
if (row->uid != committer->ctx->tbid->uid) {
|
if (row->uid != committer->ctx->tbid->uid) {
|
||||||
committer->ctx->tbid->suid = row->suid;
|
committer->ctx->tbid->suid = row->suid;
|
||||||
committer->ctx->tbid->uid = row->uid;
|
committer->ctx->tbid->uid = row->uid;
|
||||||
|
|
|
@ -544,6 +544,7 @@ static int32_t tsdbTombIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2)
|
||||||
|
|
||||||
// SIterMerger ================
|
// SIterMerger ================
|
||||||
struct SIterMerger {
|
struct SIterMerger {
|
||||||
|
bool isTomb;
|
||||||
STsdbIter *iter;
|
STsdbIter *iter;
|
||||||
SRBTree iterTree[1];
|
SRBTree iterTree[1];
|
||||||
};
|
};
|
||||||
|
@ -553,8 +554,11 @@ int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger
|
||||||
SRBTreeNode *node;
|
SRBTreeNode *node;
|
||||||
|
|
||||||
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
||||||
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
if (merger[0] == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
merger[0]->isTomb = isTomb;
|
||||||
if (isTomb) {
|
if (isTomb) {
|
||||||
tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn);
|
tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn);
|
||||||
} else {
|
} else {
|
||||||
|
@ -599,15 +603,22 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) {
|
if (merger->iter == NULL && (node = tRBTreeDropMin(merger->iterTree))) {
|
||||||
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
|
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; }
|
SRowInfo *tsdbIterMergerGetData(SIterMerger *merger) {
|
||||||
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) { return merger->iter ? merger->iter->record : NULL; }
|
ASSERT(!merger->isTomb);
|
||||||
|
return merger->iter ? merger->iter->row : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) {
|
||||||
|
ASSERT(merger->isTomb);
|
||||||
|
return merger->iter ? merger->iter->record : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -199,7 +199,7 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
// data
|
// data
|
||||||
for (SRowInfo *row; (row = tsdbIterMergerGet(merger->dataIterMerger)) != NULL;) {
|
for (SRowInfo *row; (row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL;) {
|
||||||
if (row->uid != merger->ctx->tbid->uid) {
|
if (row->uid != merger->ctx->tbid->uid) {
|
||||||
code = tsdbMergeToDataTableEnd(merger);
|
code = tsdbMergeToDataTableEnd(merger);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -273,7 +273,7 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
|
||||||
|
|
||||||
// data
|
// data
|
||||||
SRowInfo *row;
|
SRowInfo *row;
|
||||||
while ((row = tsdbIterMergerGet(merger->dataIterMerger))) {
|
while ((row = tsdbIterMergerGetData(merger->dataIterMerger))) {
|
||||||
code = tsdbSttFileWriteRow(merger->sttWriter, row);
|
code = tsdbSttFileWriteRow(merger->sttWriter, row);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue