feat: vnode multi-version
This commit is contained in:
parent
6f94998e06
commit
ebc3236c83
|
@ -36,7 +36,6 @@ target_sources(
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
"src/tsdb/tsdbCommit.c"
|
"src/tsdb/tsdbCommit.c"
|
||||||
"src/tsdb/tsdbCommit2.c"
|
|
||||||
"src/tsdb/tsdbFile.c"
|
"src/tsdb/tsdbFile.c"
|
||||||
"src/tsdb/tsdbFS.c"
|
"src/tsdb/tsdbFS.c"
|
||||||
"src/tsdb/tsdbOpen.c"
|
"src/tsdb/tsdbOpen.c"
|
||||||
|
|
|
@ -88,6 +88,18 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
|
||||||
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
|
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
|
||||||
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
||||||
|
|
||||||
|
int tsdbBegin(STsdb *pTsdb) {
|
||||||
|
if (!pTsdb) return 0;
|
||||||
|
|
||||||
|
STsdbMemTable *pMem;
|
||||||
|
|
||||||
|
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
SDFileSet nSet = {0};
|
SDFileSet nSet = {0};
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
|
|
||||||
int tsdbBegin(STsdb *pTsdb) {
|
|
||||||
if (!pTsdb) return 0;
|
|
||||||
|
|
||||||
STsdbMemTable *pMem;
|
|
||||||
|
|
||||||
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -52,6 +52,8 @@ struct SMemTable {
|
||||||
SArray *pArray; // SArray<SMemData>
|
SArray *pArray; // SArray<SMemData>
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define SL_MAX_LEVEL 5
|
||||||
|
|
||||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
||||||
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
|
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
|
||||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||||
|
@ -66,6 +68,9 @@ static int memDataPCmprFn(const void *p1, const void *p2);
|
||||||
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||||
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
||||||
|
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos);
|
||||||
|
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
|
||||||
|
SMemSkipListNode **pos);
|
||||||
|
|
||||||
// SMemTable ==============================================
|
// SMemTable ==============================================
|
||||||
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
|
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
|
||||||
|
@ -109,6 +114,7 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
|
||||||
TSDBROW row = {.version = version};
|
TSDBROW row = {.version = version};
|
||||||
|
|
||||||
ASSERT(pMemTable);
|
ASSERT(pMemTable);
|
||||||
|
ASSERT(pSubmitBlk->nData > 0);
|
||||||
|
|
||||||
{
|
{
|
||||||
// check if table exists (todo)
|
// check if table exists (todo)
|
||||||
|
@ -122,38 +128,29 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
|
||||||
|
|
||||||
// do insert
|
// do insert
|
||||||
int32_t nt;
|
int32_t nt;
|
||||||
uint8_t *pt;
|
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
uint8_t *p = pSubmitBlk->pData;
|
uint8_t *p = pSubmitBlk->pData;
|
||||||
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
int32_t nRow = 0;
|
||||||
int8_t level;
|
SMemSkipListNode *pos[SL_MAX_LEVEL] = {0};
|
||||||
SMemSkipListNode *pNode;
|
|
||||||
|
for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) {
|
||||||
|
pos[iLevel] = pMemData->sl.pTail;
|
||||||
|
}
|
||||||
while (n < pSubmitBlk->nData) {
|
while (n < pSubmitBlk->nData) {
|
||||||
nt = tGetTSRow(p + n, &row.tsRow);
|
nt = tGetTSRow(p + n, &row.tsRow);
|
||||||
n += nt;
|
n += nt;
|
||||||
|
|
||||||
ASSERT(n <= pSubmitBlk->nData);
|
ASSERT(n <= pSubmitBlk->nData);
|
||||||
|
|
||||||
// build the node
|
memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos);
|
||||||
level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos);
|
||||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + nt + sizeof(version));
|
if (code) {
|
||||||
if (pNode == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pNode->level = level;
|
|
||||||
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), &row);
|
|
||||||
|
|
||||||
// put the node (todo)
|
nRow++;
|
||||||
|
|
||||||
// set info
|
|
||||||
if (tsdbKeyCmprFn(&row, &pMemData->minKey) < 0) pMemData->minKey = *(TSDBKEY *)&row;
|
|
||||||
if (tsdbKeyCmprFn(&row, &pMemData->maxKey) > 0) pMemData->maxKey = *(TSDBKEY *)&row;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbKeyCmprFn(&pMemTable->minKey, &pMemData->minKey) < 0) pMemTable->minKey = pMemData->minKey;
|
|
||||||
if (tsdbKeyCmprFn(&pMemTable->maxKey, &pMemData->maxKey) > 0) pMemTable->maxKey = pMemData->maxKey;
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -313,41 +310,81 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
||||||
return level;
|
return level;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) {
|
||||||
|
TSDBKEY *pKey;
|
||||||
|
int c;
|
||||||
|
|
||||||
|
if (isForward) {
|
||||||
|
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
|
||||||
|
if (iLevel < pMemData->sl.level) {
|
||||||
|
SMemSkipListNode *px = pos[iLevel];
|
||||||
|
SMemSkipListNode *p = SL_NODE_FORWARD(px, iLevel);
|
||||||
|
|
||||||
|
while (p != pMemData->sl.pTail) {
|
||||||
|
pKey = (TSDBKEY *)SL_NODE_DATA(p);
|
||||||
|
|
||||||
|
c = tsdbKeyCmprFn(pKey, pRow);
|
||||||
|
if (c >= 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
px = p;
|
||||||
|
p = SL_NODE_FORWARD(px, iLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pos[iLevel] = px;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
|
||||||
|
if (iLevel < pMemData->sl.level) {
|
||||||
|
SMemSkipListNode *px = pos[iLevel];
|
||||||
|
SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel);
|
||||||
|
|
||||||
|
while (p != pMemData->sl.pHead) {
|
||||||
|
pKey = (TSDBKEY *)SL_NODE_DATA(p);
|
||||||
|
|
||||||
|
c = tsdbKeyCmprFn(pKey, pRow);
|
||||||
|
if (c <= 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
px = p;
|
||||||
|
p = SL_NODE_BACKWARD(px, iLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pos[iLevel] = px;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
|
||||||
|
SMemSkipListNode **pos) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int8_t level;
|
||||||
|
SMemSkipListNode *pNode;
|
||||||
|
|
||||||
|
level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
||||||
|
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
|
||||||
|
if (pNode == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do the read put
|
||||||
|
if (isForward) {
|
||||||
|
// TODO
|
||||||
|
} else {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0 //====================================================================================
|
#if 0 //====================================================================================
|
||||||
|
|
||||||
#define SL_MAX_LEVEL 5
|
|
||||||
|
|
||||||
struct SMemSkipListCurosr {
|
|
||||||
SMemSkipList *pSl;
|
|
||||||
SMemSkipListNode *pNodes[SL_MAX_LEVEL];
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t version;
|
|
||||||
uint32_t szRow;
|
|
||||||
const STSRow *pRow;
|
|
||||||
} STsdbRow;
|
|
||||||
|
|
||||||
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
|
|
||||||
|
|
||||||
#define SL_HEAD_NODE(sl) ((sl)->pHead)
|
|
||||||
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
|
|
||||||
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
|
|
||||||
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
|
|
||||||
|
|
||||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
|
||||||
static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow);
|
|
||||||
static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow);
|
|
||||||
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc);
|
|
||||||
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc);
|
|
||||||
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl);
|
|
||||||
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode);
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags);
|
|
||||||
static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc);
|
|
||||||
static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc);
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc);
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc);
|
|
||||||
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow);
|
|
||||||
|
|
||||||
// SMemTable ========================
|
// SMemTable ========================
|
||||||
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
|
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
|
||||||
|
@ -460,28 +497,6 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) {
|
|
||||||
if (tEncodeI64(pEncoder, pRow->version) < 0) return -1;
|
|
||||||
if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) {
|
|
||||||
if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1;
|
|
||||||
if (tDecodeBinary(pDecoder, (uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) {
|
|
||||||
*ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel);
|
|
||||||
if (*ppSlc == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); }
|
|
||||||
|
|
||||||
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
|
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
|
||||||
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
|
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
|
||||||
pSlc->pSl = pSl;
|
pSlc->pSl = pSl;
|
||||||
|
@ -490,87 +505,4 @@ static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pS
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) {
|
|
||||||
SMemSkipList *pSl = pSlc->pSl;
|
|
||||||
SMemSkipListNode *pNodeNext;
|
|
||||||
|
|
||||||
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
|
|
||||||
// todo
|
|
||||||
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pSl->level < pNode->level) {
|
|
||||||
pSl->level = pNode->level;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSl->size += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags) {
|
|
||||||
SMemSkipListNode **pForwards = NULL;
|
|
||||||
SMemSkipList *pSl = pSlc->pSl;
|
|
||||||
int8_t maxLevel = pSl->maxLevel;
|
|
||||||
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
|
|
||||||
SMemSkipListNode *pTail = SL_TAIL_NODE(pSl);
|
|
||||||
|
|
||||||
if (pSl->size == 0) {
|
|
||||||
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
|
|
||||||
pForwards[iLevel] = pHead;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc) {
|
|
||||||
SMemSkipList *pSl = pSlc->pSl;
|
|
||||||
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
|
|
||||||
|
|
||||||
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
|
|
||||||
pSlc->pNodes[iLevel] = pHead;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbMemSkipListCursorMoveToNext(pSlc);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc) {
|
|
||||||
SMemSkipList *pSl = pSlc->pSl;
|
|
||||||
SMemSkipListNode *pTail = SL_TAIL_NODE(pSl);
|
|
||||||
|
|
||||||
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
|
|
||||||
pSlc->pNodes[iLevel] = pTail;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbMemSkipListCursorMoveToPrev(pSlc);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) {
|
|
||||||
int32_t tsize;
|
|
||||||
int32_t ret;
|
|
||||||
int8_t level = tsdbMemSkipListRandLevel(pSl);
|
|
||||||
SMemSkipListNode *pNode = NULL;
|
|
||||||
SEncoder ec = {0};
|
|
||||||
|
|
||||||
tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret);
|
|
||||||
pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
|
|
||||||
if (pNode) {
|
|
||||||
pNode->level = level;
|
|
||||||
tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
|
|
||||||
tsdbEncodeRow(&ec, pTRow);
|
|
||||||
tEncoderClear(&ec);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
Loading…
Reference in New Issue