more code
This commit is contained in:
parent
8626d3e691
commit
29c29871d2
|
@ -14,7 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "trbtree.h"
|
#include "trbtree.h"
|
||||||
|
#include "tsdbDataFileRW.h"
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
|
#include "tsdbSttFileRW.h"
|
||||||
|
|
||||||
#ifndef _TSDB_ITER_H_
|
#ifndef _TSDB_ITER_H_
|
||||||
#define _TSDB_ITER_H_
|
#define _TSDB_ITER_H_
|
||||||
|
@ -27,7 +29,25 @@ typedef struct SIterMerger SIterMerger;
|
||||||
typedef struct STsdbIter STsdbIter;
|
typedef struct STsdbIter STsdbIter;
|
||||||
typedef TARRAY2(STsdbIter *) TTsdbIterArray;
|
typedef TARRAY2(STsdbIter *) TTsdbIterArray;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_ITER_TYPE_STT = 1,
|
||||||
|
TSDB_ITER_TYPE_DATA,
|
||||||
|
TSDB_ITER_TYPE_MEMT,
|
||||||
|
} EIterType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
EIterType type;
|
||||||
|
union {
|
||||||
|
SSttSegReader *sttReader;
|
||||||
|
SDataFileReader *dataReader;
|
||||||
|
SMemTable *memt;
|
||||||
|
};
|
||||||
|
} STsdbIterConfig;
|
||||||
|
|
||||||
// STsdbIter ===============
|
// STsdbIter ===============
|
||||||
|
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter);
|
||||||
|
int32_t tsdbIterClose(STsdbIter **iter);
|
||||||
|
int32_t tsdbIterNext(STsdbIter *iter);
|
||||||
|
|
||||||
// SIterMerger ===============
|
// SIterMerger ===============
|
||||||
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger);
|
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger);
|
||||||
|
|
|
@ -22,18 +22,285 @@ struct STsdbIter {
|
||||||
} ctx[1];
|
} ctx[1];
|
||||||
SRowInfo row[1];
|
SRowInfo row[1];
|
||||||
SRBTreeNode node[1];
|
SRBTreeNode node[1];
|
||||||
SBlockData bData[1];
|
EIterType type;
|
||||||
int32_t iRow;
|
union {
|
||||||
// TODO
|
struct {
|
||||||
|
SSttSegReader *reader;
|
||||||
|
const TSttBlkArray *sttBlkArray;
|
||||||
|
int32_t sttBlkArrayIdx;
|
||||||
|
SBlockData bData[1];
|
||||||
|
int32_t iRow;
|
||||||
|
} stt[1];
|
||||||
|
struct {
|
||||||
|
SDataFileReader *reader;
|
||||||
|
const TBlockIdxArray *blockIdxArray;
|
||||||
|
int32_t blockIdxArrayIdx;
|
||||||
|
const TDataBlkArray *dataBlkArray;
|
||||||
|
int32_t dataBlkArrayIdx;
|
||||||
|
SBlockData bData[1];
|
||||||
|
int32_t iRow;
|
||||||
|
} data[1];
|
||||||
|
struct {
|
||||||
|
SMemTable *memt;
|
||||||
|
} memt[1];
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tsdbIterNext(STsdbIter *iter) {
|
static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
// TODO
|
while (!iter->ctx->noMoreData) {
|
||||||
|
for (; iter->stt->iRow < iter->stt->bData->nRow; iter->stt->iRow++) {
|
||||||
|
iter->row->suid = iter->stt->bData->suid;
|
||||||
|
iter->row->uid = iter->stt->bData->uid ? iter->stt->bData->uid : iter->stt->bData->aUid[iter->stt->iRow];
|
||||||
|
|
||||||
|
if (tbid && iter->row->suid == tbid->suid && iter->row->uid == tbid->uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->row->row = tsdbRowFromBlockData(iter->stt->bData, iter->stt->iRow);
|
||||||
|
iter->stt->iRow++;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->stt->sttBlkArrayIdx >= TARRAY2_SIZE(iter->stt->sttBlkArray)) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iter->stt->sttBlkArrayIdx < TARRAY2_SIZE(iter->stt->sttBlkArray); iter->stt->sttBlkArrayIdx++) {
|
||||||
|
const SSttBlk *sttBlk = TARRAY2_GET_PTR(iter->stt->sttBlkArray, iter->stt->sttBlkArrayIdx);
|
||||||
|
|
||||||
|
if (tbid && tbid->suid == sttBlk->suid && tbid->uid == sttBlk->minUid && tbid->uid == sttBlk->maxUid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbSttFReadSttBlock(iter->stt->reader, sttBlk, iter->stt->bData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
iter->stt->iRow = 0;
|
||||||
|
iter->stt->sttBlkArrayIdx++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbIterSkipTableData(STsdbIter *iter) {
|
static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
while (!iter->ctx->noMoreData) {
|
||||||
|
for (;;) {
|
||||||
|
for (; iter->data->iRow < iter->data->bData->nRow; iter->data->iRow++) {
|
||||||
|
if (tbid && tbid->suid == iter->data->bData->suid && tbid->uid == iter->data->bData->uid) {
|
||||||
|
iter->data->iRow = iter->data->bData->nRow;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->row->row = tsdbRowFromBlockData(iter->data->bData, iter->data->iRow);
|
||||||
|
iter->data->iRow++;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->data->dataBlkArray == NULL || iter->data->dataBlkArrayIdx >= TARRAY2_SIZE(iter->data->dataBlkArray)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iter->data->dataBlkArray && iter->data->dataBlkArrayIdx < TARRAY2_SIZE(iter->data->dataBlkArray);
|
||||||
|
iter->data->dataBlkArrayIdx++) {
|
||||||
|
const SDataBlk *dataBlk = TARRAY2_GET_PTR(iter->data->dataBlkArray, iter->data->dataBlkArrayIdx);
|
||||||
|
|
||||||
|
if (tbid) {
|
||||||
|
const SBlockIdx *blockIdx = TARRAY2_GET_PTR(iter->data->blockIdxArray, iter->data->blockIdxArrayIdx - 1);
|
||||||
|
|
||||||
|
if (tbid->suid == blockIdx->suid && tbid->uid == blockIdx->uid) {
|
||||||
|
iter->data->dataBlkArrayIdx = TARRAY2_SIZE(iter->data->dataBlkArray);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDataFileReadDataBlock(iter->data->reader, dataBlk, iter->data->bData);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
iter->data->iRow = 0;
|
||||||
|
iter->data->dataBlkArrayIdx++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->data->blockIdxArrayIdx >= TARRAY2_SIZE(iter->data->blockIdxArray)) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iter->data->blockIdxArrayIdx < TARRAY2_SIZE(iter->data->blockIdxArray); iter->data->blockIdxArrayIdx++) {
|
||||||
|
const SBlockIdx *blockIdx = TARRAY2_GET_PTR(iter->data->blockIdxArray, iter->data->blockIdxArrayIdx);
|
||||||
|
|
||||||
|
if (tbid && tbid->suid == blockIdx->suid && tbid->uid == blockIdx->uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDataFileReadDataBlk(iter->data->reader, blockIdx, &iter->data->dataBlkArray);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
iter->row->suid = blockIdx->suid;
|
||||||
|
iter->row->uid = blockIdx->uid;
|
||||||
|
iter->data->dataBlkArrayIdx = 0;
|
||||||
|
iter->data->blockIdxArrayIdx++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
// TODO
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttIterOpen(STsdbIter *iter) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
code = tsdbSttFReadSttBlk(iter->stt->reader, &iter->stt->sttBlkArray);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
if (TARRAY2_SIZE(iter->stt->sttBlkArray) == 0) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->stt->sttBlkArrayIdx = 0;
|
||||||
|
tBlockDataCreate(iter->stt->bData);
|
||||||
|
iter->stt->iRow = 0;
|
||||||
|
|
||||||
|
return tsdbSttIterNext(iter, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDataIterOpen(STsdbIter *iter) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
code = tsdbDataFileReadBlockIdx(iter->data->reader, &iter->data->blockIdxArray);
|
||||||
|
if (code) return code;
|
||||||
|
|
||||||
|
// SBlockIdx
|
||||||
|
if (TARRAY2_SIZE(iter->data->blockIdxArray) == 0) {
|
||||||
|
iter->ctx->noMoreData = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
iter->data->blockIdxArrayIdx = 0;
|
||||||
|
|
||||||
|
// SDataBlk
|
||||||
|
iter->data->dataBlkArray = NULL;
|
||||||
|
iter->data->dataBlkArrayIdx = 0;
|
||||||
|
|
||||||
|
// SBlockData
|
||||||
|
tBlockDataCreate(iter->data->bData);
|
||||||
|
iter->data->iRow = 0;
|
||||||
|
|
||||||
|
return tsdbDataIterNext(iter, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMemTableIterOpen(STsdbIter *iter) {
|
||||||
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttIterClose(STsdbIter *iter) {
|
||||||
|
tBlockDataDestroy(iter->stt->bData);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDataIterClose(STsdbIter *iter) {
|
||||||
|
tBlockDataDestroy(iter->data->bData);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMemTableIterClose(STsdbIter *iter) {
|
||||||
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
iter[0] = taosMemoryCalloc(1, sizeof(*iter[0]));
|
||||||
|
if (iter[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
iter[0]->type = config->type;
|
||||||
|
iter[0]->ctx->noMoreData = false;
|
||||||
|
switch (config->type) {
|
||||||
|
case TSDB_ITER_TYPE_STT:
|
||||||
|
iter[0]->stt->reader = config->sttReader;
|
||||||
|
code = tsdbSttIterOpen(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_DATA:
|
||||||
|
iter[0]->data->reader = config->dataReader;
|
||||||
|
code = tsdbDataIterOpen(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
|
iter[0]->memt->memt = config->memt;
|
||||||
|
code = tsdbMemTableIterOpen(iter[0]);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(iter[0]);
|
||||||
|
iter[0] = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbIterClose(STsdbIter **iter) {
|
||||||
|
switch (iter[0]->type) {
|
||||||
|
case TSDB_ITER_TYPE_STT:
|
||||||
|
tsdbSttIterClose(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_DATA:
|
||||||
|
tsdbDataIterClose(iter[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
|
tsdbMemTableIterClose(iter[0]);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
taosMemoryFree(iter[0]);
|
||||||
|
iter[0] = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbIterNext(STsdbIter *iter) {
|
||||||
|
switch (iter->type) {
|
||||||
|
case TSDB_ITER_TYPE_STT:
|
||||||
|
return tsdbSttIterNext(iter, NULL);
|
||||||
|
case TSDB_ITER_TYPE_DATA:
|
||||||
|
return tsdbDataIterNext(iter, NULL);
|
||||||
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
|
return tsdbMemTableIterNext(iter, NULL);
|
||||||
|
default:
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbIterSkipTableData(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
switch (iter->type) {
|
||||||
|
case TSDB_ITER_TYPE_STT:
|
||||||
|
return tsdbSttIterNext(iter, tbid);
|
||||||
|
case TSDB_ITER_TYPE_DATA:
|
||||||
|
return tsdbDataIterNext(iter, tbid);
|
||||||
|
case TSDB_ITER_TYPE_MEMT:
|
||||||
|
return tsdbMemTableIterNext(iter, tbid);
|
||||||
|
default:
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +317,8 @@ struct SIterMerger {
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger) {
|
int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger) {
|
||||||
STsdbIter *iter;
|
STsdbIter *iter;
|
||||||
|
SRBTreeNode *node;
|
||||||
|
|
||||||
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
|
||||||
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -58,7 +326,7 @@ int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger
|
||||||
tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
|
tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
|
||||||
TARRAY2_FOREACH(iterArray, iter) {
|
TARRAY2_FOREACH(iterArray, iter) {
|
||||||
if (iter->ctx->noMoreData) continue;
|
if (iter->ctx->noMoreData) continue;
|
||||||
SRBTreeNode *node = tRBTreePut(merger[0]->iterTree, iter->node);
|
node = tRBTreePut(merger[0]->iterTree, iter->node);
|
||||||
ASSERT(node);
|
ASSERT(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,8 +354,8 @@ int32_t tsdbIterMergerNext(SIterMerger *merger) {
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
||||||
merger->iter = NULL;
|
|
||||||
ASSERT(node);
|
ASSERT(node);
|
||||||
|
merger->iter = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,17 +375,18 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
||||||
SRBTreeNode *node;
|
SRBTreeNode *node;
|
||||||
|
|
||||||
while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) {
|
while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) {
|
||||||
int32_t code = tsdbIterSkipTableData(merger->iter);
|
int32_t code = tsdbIterSkipTableData(merger->iter, tbid);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
|
||||||
if (merger->iter->ctx->noMoreData) {
|
if (merger->iter->ctx->noMoreData) {
|
||||||
merger->iter = NULL;
|
merger->iter = NULL;
|
||||||
|
} else if ((node = tRBTreeMin(merger->iterTree))) {
|
||||||
c = tsdbIterCmprFn(merger->iter->node, node);
|
c = tsdbIterCmprFn(merger->iter->node, node);
|
||||||
ASSERT(c);
|
ASSERT(c);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
node = tRBTreePut(merger->iterTree, merger->iter->node);
|
||||||
merger->iter = NULL;
|
|
||||||
ASSERT(node);
|
ASSERT(node);
|
||||||
|
merger->iter = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,5 +394,6 @@ int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
|
||||||
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
|
merger->iter = TCONTAINER_OF(node, STsdbIter, node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue