more code
This commit is contained in:
parent
46aa6d4896
commit
9caf359ee1
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
#include "tsdbFSet.h"
|
#include "tsdbFSet.h"
|
||||||
|
#include "tsdbSttFileRW.h"
|
||||||
#include "tsdbUtil.h"
|
#include "tsdbUtil.h"
|
||||||
|
|
||||||
#ifndef _TSDB_DATA_FILE_RW_H
|
#ifndef _TSDB_DATA_FILE_RW_H
|
||||||
|
@ -46,6 +47,8 @@ int32_t tsdbDataFileReaderClose(SDataFileReader **reader);
|
||||||
int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray);
|
int32_t tsdbDataFileReadBlockIdx(SDataFileReader *reader, const TBlockIdxArray **blockIdxArray);
|
||||||
int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray);
|
int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockIdx, const TDataBlkArray **dataBlkArray);
|
||||||
int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData);
|
int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData);
|
||||||
|
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray);
|
||||||
|
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData);
|
||||||
|
|
||||||
// SDataFileWriter =============================================
|
// SDataFileWriter =============================================
|
||||||
typedef struct SDataFileWriter SDataFileWriter;
|
typedef struct SDataFileWriter SDataFileWriter;
|
||||||
|
|
|
@ -33,17 +33,20 @@ typedef enum {
|
||||||
TSDB_ITER_TYPE_STT = 1,
|
TSDB_ITER_TYPE_STT = 1,
|
||||||
TSDB_ITER_TYPE_DATA,
|
TSDB_ITER_TYPE_DATA,
|
||||||
TSDB_ITER_TYPE_MEMT,
|
TSDB_ITER_TYPE_MEMT,
|
||||||
|
TSDB_ITER_TYPE_STT_TOMB,
|
||||||
|
TSDB_ITER_TYPE_DATA_TOMB,
|
||||||
|
TSDB_ITER_TYPE_MEMT_TOMB,
|
||||||
} EIterType;
|
} EIterType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
EIterType type;
|
EIterType type;
|
||||||
union {
|
union {
|
||||||
SSttSegReader *sttReader;
|
SSttSegReader *sttReader; // TSDB_ITER_TYPE_STT || TSDB_ITER_TYPE_STT_TOMB
|
||||||
SDataFileReader *dataReader;
|
SDataFileReader *dataReader; // TSDB_ITER_TYPE_DATA || TSDB_ITER_TYPE_DATA_TOMB
|
||||||
struct {
|
struct {
|
||||||
SMemTable *memt;
|
SMemTable *memt;
|
||||||
TSDBKEY from[1];
|
TSDBKEY from[1];
|
||||||
};
|
}; // TSDB_ITER_TYPE_MEMT || TSDB_ITER_TYPE_MEMT_TOMB
|
||||||
};
|
};
|
||||||
} STsdbIterConfig;
|
} STsdbIterConfig;
|
||||||
|
|
||||||
|
@ -53,10 +56,11 @@ int32_t tsdbIterClose(STsdbIter **iter);
|
||||||
int32_t tsdbIterNext(STsdbIter *iter);
|
int32_t tsdbIterNext(STsdbIter *iter);
|
||||||
|
|
||||||
// SIterMerger ===============
|
// SIterMerger ===============
|
||||||
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger);
|
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb);
|
||||||
int32_t tsdbIterMergerClose(SIterMerger **merger);
|
int32_t tsdbIterMergerClose(SIterMerger **merger);
|
||||||
int32_t tsdbIterMergerNext(SIterMerger *merger);
|
int32_t tsdbIterMergerNext(SIterMerger *merger);
|
||||||
SRowInfo *tsdbIterMergerGet(SIterMerger *merger);
|
SRowInfo *tsdbIterMergerGet(SIterMerger *merger);
|
||||||
|
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger);
|
||||||
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
|
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t tsdbSttFileReadTombBlk(SSttSegReader *reader, const TTombBlkArray **delB
|
||||||
|
|
||||||
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
|
int32_t tsdbSttFileReadDataBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData);
|
||||||
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
|
int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData);
|
||||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData);
|
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *delBlk, STombBlock *dData);
|
||||||
|
|
||||||
struct SSttFileReaderConfig {
|
struct SSttFileReaderConfig {
|
||||||
STsdb *tsdb;
|
STsdb *tsdb;
|
||||||
|
|
|
@ -24,9 +24,9 @@ extern "C" {
|
||||||
|
|
||||||
// SDelBlock ----------
|
// SDelBlock ----------
|
||||||
|
|
||||||
#define DEL_RECORD_NUM_ELEM 5
|
#define TOMB_RECORD_NUM_ELEM 5
|
||||||
typedef union {
|
typedef union {
|
||||||
int64_t aData[DEL_RECORD_NUM_ELEM];
|
int64_t aData[TOMB_RECORD_NUM_ELEM];
|
||||||
struct {
|
struct {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
|
@ -37,7 +37,7 @@ typedef union {
|
||||||
} STombRecord;
|
} STombRecord;
|
||||||
|
|
||||||
typedef union {
|
typedef union {
|
||||||
TARRAY2(int64_t) dataArr[DEL_RECORD_NUM_ELEM];
|
TARRAY2(int64_t) dataArr[TOMB_RECORD_NUM_ELEM];
|
||||||
struct {
|
struct {
|
||||||
TARRAY2(int64_t) suid[1];
|
TARRAY2(int64_t) suid[1];
|
||||||
TARRAY2(int64_t) uid[1];
|
TARRAY2(int64_t) uid[1];
|
||||||
|
@ -49,7 +49,7 @@ typedef union {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numRec;
|
int32_t numRec;
|
||||||
int32_t size[DEL_RECORD_NUM_ELEM];
|
int32_t size[TOMB_RECORD_NUM_ELEM];
|
||||||
TABLEID minTid;
|
TABLEID minTid;
|
||||||
TABLEID maxTid;
|
TABLEID maxTid;
|
||||||
int64_t minVer;
|
int64_t minVer;
|
||||||
|
|
|
@ -174,7 +174,7 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
code = TARRAY2_APPEND(committer->iterArray, iter);
|
code = TARRAY2_APPEND(committer->iterArray, iter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger);
|
code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger, false);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// loop iter
|
// loop iter
|
||||||
|
|
|
@ -217,6 +217,28 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **tombBlkArray) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombBlk, STombBlock *tData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// SDataFileWriter =============================================
|
// SDataFileWriter =============================================
|
||||||
struct SDataFileWriter {
|
struct SDataFileWriter {
|
||||||
SDataFileWriterConfig config[1];
|
SDataFileWriterConfig config[1];
|
||||||
|
|
|
@ -20,7 +20,10 @@ struct STsdbIter {
|
||||||
struct {
|
struct {
|
||||||
bool noMoreData;
|
bool noMoreData;
|
||||||
} ctx[1];
|
} ctx[1];
|
||||||
|
union {
|
||||||
SRowInfo row[1];
|
SRowInfo row[1];
|
||||||
|
STombRecord record[1];
|
||||||
|
};
|
||||||
SRBTreeNode node[1];
|
SRBTreeNode node[1];
|
||||||
EIterType type;
|
EIterType type;
|
||||||
union {
|
union {
|
||||||
|
@ -47,6 +50,26 @@ struct STsdbIter {
|
||||||
STbData *tbData;
|
STbData *tbData;
|
||||||
STbDataIter tbIter[1];
|
STbDataIter tbIter[1];
|
||||||
} memt[1];
|
} memt[1];
|
||||||
|
struct {
|
||||||
|
SSttSegReader *reader;
|
||||||
|
const TTombBlkArray *tombBlkArray;
|
||||||
|
int32_t tombBlkArrayIdx;
|
||||||
|
STombBlock tData[1];
|
||||||
|
int32_t iRow;
|
||||||
|
} sttTomb[1];
|
||||||
|
struct {
|
||||||
|
SDataFileReader *reader;
|
||||||
|
const TTombBlkArray *tombBlkArray;
|
||||||
|
int32_t tombBlkArrayIdx;
|
||||||
|
STombBlock tData[1];
|
||||||
|
int32_t iRow;
|
||||||
|
} dataTomb[1];
|
||||||
|
struct {
|
||||||
|
SMemTable *memt;
|
||||||
|
SRBTreeIter iter[1];
|
||||||
|
STbData *tbData;
|
||||||
|
STbDataIter tbIter[1];
|
||||||
|
} memtTomb[1];
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -197,6 +220,55 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
while (!iter->ctx->noMoreData) {
|
||||||
|
for (; iter->dataTomb->iRow < TOMB_BLOCK_SIZE(iter->dataTomb->tData); iter->dataTomb->iRow++) {
|
||||||
|
iter->record->suid = TARRAY2_GET(iter->dataTomb->tData->suid, iter->dataTomb->iRow);
|
||||||
|
iter->record->uid = TARRAY2_GET(iter->dataTomb->tData->uid, iter->dataTomb->iRow);
|
||||||
|
|
||||||
|
if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->record->version = TARRAY2_GET(iter->dataTomb->tData->version, iter->dataTomb->iRow);
|
||||||
|
iter->record->skey = TARRAY2_GET(iter->dataTomb->tData->skey, iter->dataTomb->iRow);
|
||||||
|
iter->record->ekey = TARRAY2_GET(iter->dataTomb->tData->ekey, iter->dataTomb->iRow);
|
||||||
|
iter->dataTomb->iRow++;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->dataTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->dataTomb->tombBlkArray)) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iter->dataTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->dataTomb->tombBlkArray);
|
||||||
|
iter->dataTomb->tombBlkArrayIdx++) {
|
||||||
|
const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->dataTomb->tombBlkArray, iter->dataTomb->tombBlkArrayIdx);
|
||||||
|
|
||||||
|
if (tbid && tbid->suid == tombBlk->minTid.suid && tbid->uid == tombBlk->minTid.uid &&
|
||||||
|
tbid->suid == tombBlk->maxTid.suid && tbid->uid == tombBlk->maxTid.uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbDataFileReadTombBlock(iter->dataTomb->reader, tombBlk, iter->dataTomb->tData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
iter->dataTomb->iRow = 0;
|
||||||
|
iter->dataTomb->tombBlkArrayIdx++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMemTableTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
ASSERTS(0, "Not implemented yet!");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSttIterOpen(STsdbIter *iter) {
|
static int32_t tsdbSttIterOpen(STsdbIter *iter) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -249,6 +321,24 @@ static int32_t tsdbSttIterClose(STsdbIter *iter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDataTombIterOpen(STsdbIter *iter) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
code = tsdbDataFileReadTombBlk(iter->dataTomb->reader, &iter->dataTomb->tombBlkArray);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
if (TARRAY2_SIZE(iter->dataTomb->tombBlkArray) == 0) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
iter->data->blockIdxArrayIdx = 0;
|
||||||
|
|
||||||
|
tTombBlockInit(iter->dataTomb->tData);
|
||||||
|
iter->dataTomb->iRow = 0;
|
||||||
|
|
||||||
|
return tsdbDataTombIterNext(iter, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbDataIterClose(STsdbIter *iter) {
|
static int32_t tsdbDataIterClose(STsdbIter *iter) {
|
||||||
tBlockDataDestroy(iter->data->bData);
|
tBlockDataDestroy(iter->data->bData);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -256,6 +346,68 @@ static int32_t tsdbDataIterClose(STsdbIter *iter) {
|
||||||
|
|
||||||
static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
|
static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
|
||||||
|
|
||||||
|
static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
while (!iter->ctx->noMoreData) {
|
||||||
|
for (; iter->sttTomb->iRow < TOMB_BLOCK_SIZE(iter->sttTomb->tData); iter->sttTomb->iRow++) {
|
||||||
|
iter->record->suid = TARRAY2_GET(iter->sttTomb->tData->suid, iter->sttTomb->iRow);
|
||||||
|
iter->record->uid = TARRAY2_GET(iter->sttTomb->tData->uid, iter->sttTomb->iRow);
|
||||||
|
|
||||||
|
if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->record->version = TARRAY2_GET(iter->sttTomb->tData->version, iter->sttTomb->iRow);
|
||||||
|
iter->record->skey = TARRAY2_GET(iter->sttTomb->tData->skey, iter->sttTomb->iRow);
|
||||||
|
iter->record->ekey = TARRAY2_GET(iter->sttTomb->tData->ekey, iter->sttTomb->iRow);
|
||||||
|
iter->sttTomb->iRow++;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->sttTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->sttTomb->tombBlkArray)) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iter->sttTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->sttTomb->tombBlkArray);
|
||||||
|
iter->sttTomb->tombBlkArrayIdx++) {
|
||||||
|
const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->sttTomb->tombBlkArray, iter->sttTomb->tombBlkArrayIdx);
|
||||||
|
|
||||||
|
if (tbid && tbid->suid == tombBlk->minTid.suid && tbid->uid == tombBlk->minTid.uid &&
|
||||||
|
tbid->suid == tombBlk->maxTid.suid && tbid->uid == tombBlk->maxTid.uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbSttFileReadTombBlock(iter->sttTomb->reader, tombBlk, iter->sttTomb->tData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
iter->sttTomb->iRow = 0;
|
||||||
|
iter->sttTomb->tombBlkArrayIdx++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttTombIterOpen(STsdbIter *iter) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
code = tsdbSttFileReadTombBlk(iter->sttTomb->reader, &iter->sttTomb->tombBlkArray);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
if (TARRAY2_SIZE(iter->sttTomb->tombBlkArray) == 0) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->sttTomb->tombBlkArrayIdx = 0;
|
||||||
|
tTombBlockInit(iter->sttTomb->tData);
|
||||||
|
iter->sttTomb->iRow = 0;
|
||||||
|
|
||||||
|
return tsdbSttTombIterNext(iter, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
|
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -278,8 +430,19 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
|
||||||
iter[0]->memt->from[0] = config->from[0];
|
iter[0]->memt->from[0] = config->from[0];
|
||||||
code = tsdbMemTableIterOpen(iter[0]);
|
code = tsdbMemTableIterOpen(iter[0]);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_ITER_TYPE_STT_TOMB:
|
||||||
|
iter[0]->sttTomb->reader = config->sttReader;
|
||||||
|
code = tsdbSttTombIterOpen(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_DATA_TOMB:
|
||||||
|
iter[0]->dataTomb->reader = config->dataReader;
|
||||||
|
code = tsdbDataTombIterOpen(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_MEMT_TOMB:
|
||||||
|
ASSERTS(0, "Not implemented");
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(false);
|
ASSERTS(false, "Not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -289,6 +452,16 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttTombIterClose(STsdbIter *iter) {
|
||||||
|
tTombBlockFree(iter->sttTomb->tData);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDataTombIterClose(STsdbIter *iter) {
|
||||||
|
tTombBlockFree(iter->dataTomb->tData);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbIterClose(STsdbIter **iter) {
|
int32_t tsdbIterClose(STsdbIter **iter) {
|
||||||
switch (iter[0]->type) {
|
switch (iter[0]->type) {
|
||||||
case TSDB_ITER_TYPE_STT:
|
case TSDB_ITER_TYPE_STT:
|
||||||
|
@ -300,6 +473,15 @@ int32_t tsdbIterClose(STsdbIter **iter) {
|
||||||
case TSDB_ITER_TYPE_MEMT:
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
tsdbMemTableIterClose(iter[0]);
|
tsdbMemTableIterClose(iter[0]);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_ITER_TYPE_STT_TOMB:
|
||||||
|
tsdbSttTombIterClose(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_DATA_TOMB:
|
||||||
|
tsdbDataTombIterClose(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_MEMT_TOMB:
|
||||||
|
ASSERTS(false, "Not implemented");
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
|
@ -316,6 +498,12 @@ int32_t tsdbIterNext(STsdbIter *iter) {
|
||||||
return tsdbDataIterNext(iter, NULL);
|
return tsdbDataIterNext(iter, NULL);
|
||||||
case TSDB_ITER_TYPE_MEMT:
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
return tsdbMemTableIterNext(iter, NULL);
|
return tsdbMemTableIterNext(iter, NULL);
|
||||||
|
case TSDB_ITER_TYPE_STT_TOMB:
|
||||||
|
return tsdbSttTombIterNext(iter, NULL);
|
||||||
|
case TSDB_ITER_TYPE_DATA_TOMB:
|
||||||
|
return tsdbDataTombIterNext(iter, NULL);
|
||||||
|
case TSDB_ITER_TYPE_MEMT_TOMB:
|
||||||
|
return tsdbMemTableTombIterNext(iter, NULL);
|
||||||
default:
|
default:
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
|
@ -342,20 +530,49 @@ static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
|
||||||
return tRowInfoCmprFn(&iter1->row, &iter2->row);
|
return tRowInfoCmprFn(&iter1->row, &iter2->row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbTombIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
|
||||||
|
STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
|
||||||
|
STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);
|
||||||
|
|
||||||
|
if (iter1->record->suid < iter2->record->suid) {
|
||||||
|
return -1;
|
||||||
|
} else if (iter1->record->suid > iter2->record->suid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter1->record->uid < iter2->record->uid) {
|
||||||
|
return -1;
|
||||||
|
} else if (iter1->record->uid > iter2->record->uid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter1->record->version < iter2->record->version) {
|
||||||
|
return -1;
|
||||||
|
} else if (iter1->record->version > iter2->record->version) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// SIterMerger ================
|
// SIterMerger ================
|
||||||
struct SIterMerger {
|
struct SIterMerger {
|
||||||
STsdbIter *iter;
|
STsdbIter *iter;
|
||||||
SRBTree iterTree[1];
|
SRBTree iterTree[1];
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger) {
|
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb) {
|
||||||
STsdbIter *iter;
|
STsdbIter *iter;
|
||||||
SRBTreeNode *node;
|
SRBTreeNode *node;
|
||||||
|
|
||||||
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
||||||
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
if (isTomb) {
|
||||||
|
tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn);
|
||||||
|
} else {
|
||||||
tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
|
tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
|
||||||
|
}
|
||||||
TARRAY2_FOREACH(iterArray, iter) {
|
TARRAY2_FOREACH(iterArray, iter) {
|
||||||
if (iter->ctx->noMoreData) continue;
|
if (iter->ctx->noMoreData) continue;
|
||||||
node = tRBTreePut(merger[0]->iterTree, iter->node);
|
node = tRBTreePut(merger[0]->iterTree, iter->node);
|
||||||
|
@ -385,7 +602,7 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) {
|
||||||
if (merger->iter->ctx->noMoreData) {
|
if (merger->iter->ctx->noMoreData) {
|
||||||
merger->iter = NULL;
|
merger->iter = NULL;
|
||||||
} else if ((node = tRBTreeMin(merger->iterTree))) {
|
} else if ((node = tRBTreeMin(merger->iterTree))) {
|
||||||
c = tsdbIterCmprFn(merger->iter->node, node);
|
c = merger->iterTree->cmprFn(merger->iter->node, node);
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
||||||
|
@ -403,6 +620,7 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; }
|
SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; }
|
||||||
|
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) { return merger->iter ? merger->iter->record : NULL; }
|
||||||
|
|
||||||
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
@ -416,7 +634,7 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
||||||
if (merger->iter->ctx->noMoreData) {
|
if (merger->iter->ctx->noMoreData) {
|
||||||
merger->iter = NULL;
|
merger->iter = NULL;
|
||||||
} else if ((node = tRBTreeMin(merger->iterTree))) {
|
} else if ((node = tRBTreeMin(merger->iterTree))) {
|
||||||
c = tsdbIterCmprFn(merger->iter->node, node);
|
c = merger->iterTree->cmprFn(merger->iter->node, node);
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
||||||
|
|
|
@ -360,7 +360,7 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger);
|
code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger, false);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
|
@ -243,7 +243,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
|
int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, STombBlock *dData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue