feat: vnode multi-version

This commit is contained in:
Hongze Cheng 2022-05-31 12:53:23 +00:00
parent f0045e3f38
commit 73cbc6f717
2 changed files with 114 additions and 46 deletions

View File

@ -36,6 +36,8 @@ typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelOp SDelOp;
static int tsdbKeyCmprFn(const void *p1, const void *p2);
// tsdbMemTable2.c ==============================================================================================
typedef struct SMemTable SMemTable;
@ -873,6 +875,25 @@ struct SDelOp {
SDelOp *pNext;
};
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}
#endif
#ifdef __cplusplus

View File

@ -15,19 +15,33 @@
#include "tsdb.h"
typedef struct SMemData SMemData;
// typedef struct SMemSkipList SMemSkipList;
// typedef struct SMemSkipListNode SMemSkipListNode;
// typedef struct SMemSkipListCurosr SMemSkipListCurosr;
typedef struct SMemData SMemData;
typedef struct SMemSkipList SMemSkipList;
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
SMemSkipListNode *forwards[0];
};
struct SMemSkipList {
uint32_t seed;
int32_t size;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
};
struct SMemData {
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRows;
SDelOp *delOpHead;
SDelOp *delOpTail;
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRows;
SDelOp *delOpHead;
SDelOp *delOpTail;
SMemSkipList sl;
};
struct SMemTable {
@ -39,8 +53,17 @@ struct SMemTable {
SArray *pArray; // SArray<SMemData>
};
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int memDataPCmprFn(const void *p1, const void *p2);
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
// SMemTable ==============================================
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
@ -96,16 +119,41 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
}
// do insert
uint32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
int32_t nt;
uint8_t *pt;
int32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
SVBufPool *pPool = pTsdb->pVnode->inUse;
int8_t level;
SMemSkipListNode *pNode;
while (n < pSubmitBlk->nData) {
n += tGetTSRow(p + n, &row.tsRow);
nt = tGetTSRow(p + n, &row.tsRow);
n += nt;
ASSERT(n <= pSubmitBlk->nData);
// TODO
// build the node
level = tsdbMemSkipListRandLevel(&pMemData->sl);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + nt + sizeof(version));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pNode->level = level;
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), &row);
// put the node (todo)
pMemData->nRows++;
// set info
if (tsdbKeyCmprFn(&row, &pMemData->minKey) < 0) pMemData->minKey = *(TSDBKEY *)&row;
if (tsdbKeyCmprFn(&row, &pMemData->maxKey) > 0) pMemData->maxKey = *(TSDBKEY *)&row;
}
if (tsdbKeyCmprFn(&pMemTable->minKey, &pMemData->minKey) < 0) pMemTable->minKey = pMemData->minKey;
if (tsdbKeyCmprFn(&pMemTable->maxKey, &pMemData->maxKey) > 0) pMemTable->maxKey = pMemData->maxKey;
return code;
_err:
@ -224,23 +272,40 @@ static int memDataPCmprFn(const void *p1, const void *p2) {
return 0;
}
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pRow->version);
n += tPutTSRow(p ? p + n : p, &pRow->tsRow);
return n;
}
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
n += tGetI64(p + n, &pRow->version);
n += tGetTSRow(p + n, &pRow->tsRow);
return n;
}
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
const uint32_t factor = 4;
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
return level;
}
#if 0 //====================================================================================
#define SL_MAX_LEVEL 5
struct SMemSkipListNode {
int8_t level;
SMemSkipListNode *forwards[1]; // Windows does not allow 0
};
struct SMemSkipList {
uint32_t seed;
int8_t maxLevel;
int8_t level;
int32_t size;
SMemSkipListNode pHead[1]; // Windows does not allow 0
};
struct SMemSkipListCurosr {
SMemSkipList *pSl;
SMemSkipListNode *pNodes[SL_MAX_LEVEL];
@ -254,12 +319,6 @@ typedef struct {
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
@ -390,18 +449,6 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
return 0;
}
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
const uint32_t factor = 4;
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
return level;
}
static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) {
if (tEncodeI64(pEncoder, pRow->version) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;