more code

This commit is contained in:
Hongze Cheng 2023-05-30 15:31:11 +08:00
parent 9222d9651e
commit e0889b404d
4 changed files with 60 additions and 12 deletions

View File

@ -409,6 +409,7 @@ struct STbData {
SDelData *pTail; SDelData *pTail;
SMemSkipList sl; SMemSkipList sl;
STbData *next; STbData *next;
SRBTreeNode rbtn[1];
}; };
struct SMemTable { struct SMemTable {
@ -425,6 +426,7 @@ struct SMemTable {
int32_t nTbData; int32_t nTbData;
int32_t nBucket; int32_t nBucket;
STbData **aBucket; STbData **aBucket;
SRBTree tbDataTree[1];
}; };
struct TSDBROW { struct TSDBROW {

View File

@ -40,7 +40,10 @@ typedef struct {
union { union {
SSttSegReader *sttReader; SSttSegReader *sttReader;
SDataFileReader *dataReader; SDataFileReader *dataReader;
struct {
SMemTable *memt; SMemTable *memt;
TSDBKEY from[1];
};
}; };
} STsdbIterConfig; } STsdbIterConfig;

View File

@ -42,6 +42,10 @@ struct STsdbIter {
} data[1]; } data[1];
struct { struct {
SMemTable *memt; SMemTable *memt;
TSDBKEY from[1];
SRBTreeIter iter[1];
STbData *tbData;
STbDataIter tbIter[1];
} memt[1]; } memt[1];
}; };
}; };
@ -156,8 +160,38 @@ _exit:
} }
static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) { static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
// TODO SRBTreeNode *node;
ASSERT(0);
while (!iter->ctx->noMoreData) {
while (iter->memt->tbData && tsdbTbDataIterNext(iter->memt->tbIter)) {
if (tbid && tbid->suid == iter->memt->tbData->suid && tbid->uid == iter->memt->tbData->uid) {
iter->memt->tbData = NULL;
break;
}
iter->row->row = *tsdbTbDataIterGet(iter->memt->tbIter);
goto _exit;
}
for (;;) {
node = tRBTreeIterNext(iter->memt->iter);
if (!node) {
iter->ctx->noMoreData = true;
break;
}
iter->memt->tbData = TCONTAINER_OF(node, STbData, rbtn);
if (tbid && tbid->suid == iter->memt->tbData->suid && tbid->uid == iter->memt->tbData->uid) {
continue;
} else {
iter->row->suid = iter->memt->tbData->suid;
iter->row->uid = iter->memt->tbData->uid;
tsdbTbDataIterOpen(iter->memt->tbData, iter->memt->from, 0, iter->memt->tbIter);
break;
}
}
}
_exit:
return 0; return 0;
} }
@ -204,9 +238,8 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) {
} }
static int32_t tsdbMemTableIterOpen(STsdbIter *iter) { static int32_t tsdbMemTableIterOpen(STsdbIter *iter) {
// TODO iter->memt->iter[0] = tRBTreeIterCreate(iter->memt->memt->tbDataTree, 1);
ASSERT(0); return tsdbMemTableIterNext(iter, NULL);
return 0;
} }
static int32_t tsdbSttIterClose(STsdbIter *iter) { static int32_t tsdbSttIterClose(STsdbIter *iter) {
@ -219,11 +252,7 @@ static int32_t tsdbDataIterClose(STsdbIter *iter) {
return 0; return 0;
} }
static int32_t tsdbMemTableIterClose(STsdbIter *iter) { static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
// TODO
ASSERT(0);
return 0;
}
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
int32_t code; int32_t code;
@ -244,6 +273,7 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
break; break;
case TSDB_ITER_TYPE_MEMT: case TSDB_ITER_TYPE_MEMT:
iter[0]->memt->memt = config->memt; iter[0]->memt->memt = config->memt;
iter[0]->memt->from[0] = config->from[0];
code = tsdbMemTableIterOpen(iter[0]); code = tsdbMemTableIterOpen(iter[0]);
break; break;
default: default:

View File

@ -38,6 +38,16 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version, static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version,
SSubmitTbData *pSubmitTbData, int32_t *affectedRows); SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn);
STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn);
if (tbData1->suid < tbData2->suid) return -1;
if (tbData1->suid > tbData2->suid) return 1;
if (tbData1->uid < tbData2->uid) return -1;
if (tbData1->uid > tbData2->uid) return 1;
return 0;
}
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
int32_t code = 0; int32_t code = 0;
SMemTable *pMemTable = NULL; SMemTable *pMemTable = NULL;
@ -66,6 +76,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
goto _err; goto _err;
} }
vnodeBufPoolRef(pMemTable->pPool); vnodeBufPoolRef(pMemTable->pPool);
tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn);
*ppMemTable = pMemTable; *ppMemTable = pMemTable;
return code; return code;
@ -406,6 +417,8 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
pMemTable->aBucket[idx] = pTbData; pMemTable->aBucket[idx] = pTbData;
pMemTable->nTbData++; pMemTable->nTbData++;
tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn);
taosWUnLockLatch(&pMemTable->latch); taosWUnLockLatch(&pMemTable->latch);
_exit: _exit: