From 68fc2acc9caad5f1587f83d3767db1c5b529d0c4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 6 May 2022 11:16:53 +0000 Subject: [PATCH] refact --- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 114 ++++++++++++++------ 1 file changed, 81 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index 1c263b7d00..e7b733369a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -15,10 +15,11 @@ #include "tsdb.h" -typedef struct SMemTable SMemTable; -typedef struct SMemData SMemData; -typedef struct SMemSkipList SMemSkipList; -typedef struct SMemSkipListNode SMemSkipListNode; +typedef struct SMemTable SMemTable; +typedef struct SMemData SMemData; +typedef struct SMemSkipList SMemSkipList; +typedef struct SMemSkipListNode SMemSkipListNode; +typedef struct SMemSkipListCurosr SMemSkipListCurosr; struct SMemTable { STsdb *pTsdb; @@ -57,6 +58,20 @@ struct SMemData { SMemSkipList sl; }; +struct SMemSkipListCurosr { + SMemSkipList *pSl; + SMemSkipListNode *pNodeC; +}; + +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) +#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) +#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) +#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) + +#define SL_HEAD_NODE(sl) ((sl)->pHead) +#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) + // SMemTable int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { SMemTable *pMemTb = NULL; @@ -96,17 +111,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { } int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { - SMemData *pMemData; - STsdb *pTsdb = pMemTb->pTsdb; - SVnode *pVnode = pTsdb->pVnode; - SVBufPool *pPool = pVnode->inUse; - int32_t hash; - int32_t tlen; - uint8_t buf[16]; - int32_t rlen; - const uint8_t *p; - SMemSkipListNode *pSlNode; - const STSRow *pTSRow; + SMemData *pMemData; + STsdb *pTsdb = pMemTb->pTsdb; + SVnode *pVnode = pTsdb->pVnode; + SVBufPool *pPool = pVnode->inUse; + int32_t hash; + int32_t tlen; + uint8_t buf[16]; + int32_t rlen; + const uint8_t *p; + SMemSkipListNode *pSlNode; + const STSRow *pTSRow; + SMemSkipListCurosr slc = {0}; // search hash hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket; @@ -116,7 +132,11 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p // create pMemData if need if (pMemData == NULL) { - pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData)); + int8_t maxLevel = pVnode->config.tsdbCfg.slLevel; + int32_t tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2; + SMemSkipListNode *pHead, *pTail; + + pMemData = vnodeBufPoolMalloc(pPool, tsize); if (pMemData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -131,46 +151,74 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p pMemData->maxVer = -1; pMemData->nRows = 0; pMemData->sl.seed = taosRand(); - pMemData->sl.maxLevel = pVnode->config.tsdbCfg.slLevel; + pMemData->sl.maxLevel = maxLevel; pMemData->sl.level = 0; pMemData->sl.size = 0; + pHead = SL_HEAD_NODE(&pMemData->sl); + pTail = SL_TAIL_NODE(&pMemData->sl); + pHead->level = maxLevel; + pTail->level = maxLevel; + for (int iLevel = 0; iLevel < maxLevel; iLevel++) { + SL_NODE_FORWARD(pHead, iLevel) = pTail; + SL_NODE_FORWARD(pTail, iLevel) = pHead; + } // add to MemTable hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket; pMemData->pHashNext = pMemTb->pBuckets[hash]; pMemTb->pBuckets[hash] = pMemData; + pMemTb->nHash++; } - // loop to insert data to skiplist +// loop to insert data to skiplist +#if 0 + tsdbMemSkipListCursorOpen(&slc, &pMemData->sl); p = pSubmitBlk->pData; for (;;) { if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; - // p = tGetLen(p, &rlen); - pTSRow = (STSRow *)p; - p += rlen; - if (pTSRow == NULL) break; + const uint8_t *pt = p; + p = tGetBinary(p, &pTSRow, &rlen); // check the row (todo) // move the cursor to position to write (todo) + int32_t c; + tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); + ASSERT(c); - // insert the row - int8_t level = 1; - tlen = 0; // sizeof(int64_t) + tsdbPutLen(rlen) + rlen; - pSlNode = vnodeBufPoolMalloc(pPool, tlen); - if (pSlNode == NULL) { - ASSERT(0); - } + // encode row + int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); + pSlNode = vnodeBufPoolMalloc(pPool, tsize); + pSlNode->level = level; + + uint8_t *pData = SL_NODE_DATA(pSlNode); + *(int64_t *)pData = version; + pData += sizeof(version); + memcpy(pData, pt, p - pt); + + // insert row + tsdbMemSkipListCursorPut(&slc, pSlNode); + + // update status + if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts; + if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts; } + tsdbMemSkipListCursorClose(&slc); +#endif + + if (pMemData->minVer == -1) pMemData->minVer = version; + if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version; + + if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey; + if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey; + if (pMemTb->minVer == -1) pMemTb->minVer = version; + if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; return 0; } -static void tsdbEncodeRow(int64_t version, int32_t rlen, const STSRow *pRow) {} - -static void tsdbDecodeRow(int64_t *version, int32_t *rlen, const STSRow **ppRow) {} - // SMemData // SMemSkipList \ No newline at end of file