diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4f22ce768f..a62b4c4409 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -36,6 +36,8 @@ typedef struct TSDBROW TSDBROW; typedef struct TSDBKEY TSDBKEY; typedef struct SDelOp SDelOp; +static int tsdbKeyCmprFn(const void *p1, const void *p2); + // tsdbMemTable2.c ============================================================================================== typedef struct SMemTable SMemTable; @@ -873,6 +875,25 @@ struct SDelOp { SDelOp *pNext; }; +static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) { + TSDBKEY *pKey1 = (TSDBKEY *)p1; + TSDBKEY *pKey2 = (TSDBKEY *)p2; + + if (pKey1->ts < pKey2->ts) { + return -1; + } else if (pKey1->ts > pKey2->ts) { + return 1; + } + + if (pKey1->version < pKey2->version) { + return -1; + } else if (pKey1->version > pKey2->version) { + return 1; + } + + return 0; +} + #endif #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index aab190b0bb..cbc9fef686 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -15,19 +15,33 @@ #include "tsdb.h" -typedef struct SMemData SMemData; -// typedef struct SMemSkipList SMemSkipList; -// typedef struct SMemSkipListNode SMemSkipListNode; -// typedef struct SMemSkipListCurosr SMemSkipListCurosr; +typedef struct SMemData SMemData; +typedef struct SMemSkipList SMemSkipList; +typedef struct SMemSkipListNode SMemSkipListNode; + +struct SMemSkipListNode { + int8_t level; + SMemSkipListNode *forwards[0]; +}; + +struct SMemSkipList { + uint32_t seed; + int32_t size; + int8_t maxLevel; + int8_t level; + SMemSkipListNode *pHead; + SMemSkipListNode *pTail; +}; struct SMemData { - tb_uid_t suid; - tb_uid_t uid; - TSDBKEY minKey; - TSDBKEY maxKey; - int64_t nRows; - SDelOp *delOpHead; - SDelOp *delOpTail; + tb_uid_t suid; + tb_uid_t uid; + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t nRows; + SDelOp *delOpHead; + SDelOp *delOpTail; + SMemSkipList sl; }; struct SMemTable { @@ -39,8 +53,17 @@ struct SMemTable { SArray *pArray; // SArray }; +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) +#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) +#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) +#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) + static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int memDataPCmprFn(const void *p1, const void *p2); +static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); +static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); +static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); // SMemTable ============================================== int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { @@ -96,16 +119,41 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit } // do insert - uint32_t n = 0; - uint8_t *p = pSubmitBlk->pData; + int32_t nt; + uint8_t *pt; + int32_t n = 0; + uint8_t *p = pSubmitBlk->pData; + SVBufPool *pPool = pTsdb->pVnode->inUse; + int8_t level; + SMemSkipListNode *pNode; while (n < pSubmitBlk->nData) { - n += tGetTSRow(p + n, &row.tsRow); + nt = tGetTSRow(p + n, &row.tsRow); + n += nt; ASSERT(n <= pSubmitBlk->nData); - // TODO + // build the node + level = tsdbMemSkipListRandLevel(&pMemData->sl); + pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + nt + sizeof(version)); + if (pNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pNode->level = level; + tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), &row); + + // put the node (todo) + + pMemData->nRows++; + + // set info + if (tsdbKeyCmprFn(&row, &pMemData->minKey) < 0) pMemData->minKey = *(TSDBKEY *)&row; + if (tsdbKeyCmprFn(&row, &pMemData->maxKey) > 0) pMemData->maxKey = *(TSDBKEY *)&row; } + if (tsdbKeyCmprFn(&pMemTable->minKey, &pMemData->minKey) < 0) pMemTable->minKey = pMemData->minKey; + if (tsdbKeyCmprFn(&pMemTable->maxKey, &pMemData->maxKey) > 0) pMemTable->maxKey = pMemData->maxKey; + return code; _err: @@ -224,23 +272,40 @@ static int memDataPCmprFn(const void *p1, const void *p2) { return 0; } +static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { + int32_t n = 0; + + n += tPutI64(p ? p + n : p, pRow->version); + n += tPutTSRow(p ? p + n : p, &pRow->tsRow); + + return n; +} + +static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { + int32_t n = 0; + + n += tGetI64(p + n, &pRow->version); + n += tGetTSRow(p + n, &pRow->tsRow); + + return n; +} + +static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { + int8_t level = 1; + int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); + const uint32_t factor = 4; + + while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { + level++; + } + + return level; +} + #if 0 //==================================================================================== #define SL_MAX_LEVEL 5 -struct SMemSkipListNode { - int8_t level; - SMemSkipListNode *forwards[1]; // Windows does not allow 0 -}; - -struct SMemSkipList { - uint32_t seed; - int8_t maxLevel; - int8_t level; - int32_t size; - SMemSkipListNode pHead[1]; // Windows does not allow 0 -}; - struct SMemSkipListCurosr { SMemSkipList *pSl; SMemSkipListNode *pNodes[SL_MAX_LEVEL]; @@ -254,12 +319,6 @@ typedef struct { #define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET)) -#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) -#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) -#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) -#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) - #define SL_HEAD_NODE(sl) ((sl)->pHead) #define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) #define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) @@ -390,18 +449,6 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p return 0; } -static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { - int8_t level = 1; - int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - const uint32_t factor = 4; - - while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { - level++; - } - - return level; -} - static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) { if (tEncodeI64(pEncoder, pRow->version) < 0) return -1; if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;