From e0889b404d5f27379f0a5e2195e8eb54d57a4aae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 30 May 2023 15:31:11 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 + .../dnode/vnode/src/tsdb/dev/inc/tsdbIter.h | 5 +- source/dnode/vnode/src/tsdb/dev/tsdbIter.c | 52 +++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 13 +++++ 4 files changed, 60 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4f9648ef6d..3ee4980012 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -409,6 +409,7 @@ struct STbData { SDelData *pTail; SMemSkipList sl; STbData *next; + SRBTreeNode rbtn[1]; }; struct SMemTable { @@ -425,6 +426,7 @@ struct SMemTable { int32_t nTbData; int32_t nBucket; STbData **aBucket; + SRBTree tbDataTree[1]; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h index 84779962c0..6e17c386b6 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h @@ -40,7 +40,10 @@ typedef struct { union { SSttSegReader *sttReader; SDataFileReader *dataReader; - SMemTable *memt; + struct { + SMemTable *memt; + TSDBKEY from[1]; + }; }; } STsdbIterConfig; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c index 49aad7ea81..7d4e6a9624 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -41,7 +41,11 @@ struct STsdbIter { int32_t iRow; } data[1]; struct { - SMemTable *memt; + SMemTable *memt; + TSDBKEY from[1]; + SRBTreeIter iter[1]; + STbData *tbData; + STbDataIter tbIter[1]; } memt[1]; }; }; @@ -156,8 +160,38 @@ _exit: } static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) { - // TODO - ASSERT(0); + SRBTreeNode *node; + + while (!iter->ctx->noMoreData) { + while (iter->memt->tbData && tsdbTbDataIterNext(iter->memt->tbIter)) { + if (tbid && tbid->suid == iter->memt->tbData->suid && tbid->uid == iter->memt->tbData->uid) { + iter->memt->tbData = NULL; + break; + } + iter->row->row = *tsdbTbDataIterGet(iter->memt->tbIter); + goto _exit; + } + + for (;;) { + node = tRBTreeIterNext(iter->memt->iter); + if (!node) { + iter->ctx->noMoreData = true; + break; + } + + iter->memt->tbData = TCONTAINER_OF(node, STbData, rbtn); + if (tbid && tbid->suid == iter->memt->tbData->suid && tbid->uid == iter->memt->tbData->uid) { + continue; + } else { + iter->row->suid = iter->memt->tbData->suid; + iter->row->uid = iter->memt->tbData->uid; + tsdbTbDataIterOpen(iter->memt->tbData, iter->memt->from, 0, iter->memt->tbIter); + break; + } + } + } + +_exit: return 0; } @@ -204,9 +238,8 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) { } static int32_t tsdbMemTableIterOpen(STsdbIter *iter) { - // TODO - ASSERT(0); - return 0; + iter->memt->iter[0] = tRBTreeIterCreate(iter->memt->memt->tbDataTree, 1); + return tsdbMemTableIterNext(iter, NULL); } static int32_t tsdbSttIterClose(STsdbIter *iter) { @@ -219,11 +252,7 @@ static int32_t tsdbDataIterClose(STsdbIter *iter) { return 0; } -static int32_t tsdbMemTableIterClose(STsdbIter *iter) { - // TODO - ASSERT(0); - return 0; -} +static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; } int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { int32_t code; @@ -244,6 +273,7 @@ int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) { break; case TSDB_ITER_TYPE_MEMT: iter[0]->memt->memt = config->memt; + iter[0]->memt->from[0] = config->from[0]; code = tsdbMemTableIterOpen(iter[0]); break; default: diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6d223e00c5..ee3abf7559 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -38,6 +38,16 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows); +static int32_t tTbDataCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { + STbData *tbData1 = TCONTAINER_OF(n1, STbData, rbtn); + STbData *tbData2 = TCONTAINER_OF(n2, STbData, rbtn); + if (tbData1->suid < tbData2->suid) return -1; + if (tbData1->suid > tbData2->suid) return 1; + if (tbData1->uid < tbData2->uid) return -1; + if (tbData1->uid > tbData2->uid) return 1; + return 0; +} + int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t code = 0; SMemTable *pMemTable = NULL; @@ -66,6 +76,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { goto _err; } vnodeBufPoolRef(pMemTable->pPool); + tRBTreeCreate(pMemTable->tbDataTree, tTbDataCmprFn); *ppMemTable = pMemTable; return code; @@ -406,6 +417,8 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid pMemTable->aBucket[idx] = pTbData; pMemTable->nTbData++; + tRBTreePut(pMemTable->tbDataTree, pTbData->rbtn); + taosWUnLockLatch(&pMemTable->latch); _exit: