refact
This commit is contained in:
parent
b9230fd364
commit
80bd6409ab
|
@ -61,25 +61,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
|
|||
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
|
||||
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
||||
|
||||
// tsdbMemTable2.c ==============================================================================================
|
||||
// typedef struct SMemTable2 SMemTable2;
|
||||
// typedef struct SMemData SMemData;
|
||||
// typedef struct SMemDataIter SMemDataIter;
|
||||
|
||||
// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable);
|
||||
// void tsdbMemTableDestroy2(SMemTable2 *pMemTable);
|
||||
// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
|
||||
// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
|
||||
// /* SMemDataIter */
|
||||
// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
|
||||
// bool tsdbMemDataIterNext(SMemDataIter *pIter);
|
||||
// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
|
||||
|
||||
// // tsdbCommit2.c ==============================================================================================
|
||||
// int32_t tsdbBegin2(STsdb *pTsdb);
|
||||
// int32_t tsdbCommit2(STsdb *pTsdb);
|
||||
|
||||
// tsdbFile.c ==============================================================================================
|
||||
typedef int32_t TSDB_FILE_T;
|
||||
typedef struct SDFInfo SDFInfo;
|
||||
|
@ -700,17 +681,6 @@ typedef struct {
|
|||
TSKEY eKey;
|
||||
} SDelInfo;
|
||||
|
||||
struct SMemTable2 {
|
||||
STsdb *pTsdb;
|
||||
int32_t nRef;
|
||||
TSDBKEY minKey;
|
||||
TSDBKEY maxKey;
|
||||
int64_t nRows;
|
||||
int64_t nDelOp;
|
||||
SArray *aSkmInfo;
|
||||
SArray *aMemData;
|
||||
};
|
||||
|
||||
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||
TSDBKEY *pKey2 = (TSDBKEY *)p2;
|
||||
|
@ -730,24 +700,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
struct SMemData {
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
TSDBKEY minKey;
|
||||
TSDBKEY maxKey;
|
||||
SDelOp *delOpHead;
|
||||
SDelOp *delOpTail;
|
||||
SMemSkipList sl;
|
||||
};
|
||||
|
||||
struct SMemDataIter {
|
||||
STbData *pMemData;
|
||||
int8_t backward;
|
||||
TSDBROW *pRow;
|
||||
SMemSkipListNode *pNode; // current node
|
||||
TSDBROW row;
|
||||
};
|
||||
|
||||
struct STbDataIter {
|
||||
STbData *pTbData;
|
||||
int8_t backward;
|
||||
|
|
|
@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
|||
// tsdb
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
int tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
|
@ -161,18 +161,6 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
|
|||
void tdUidStoreDestory(STbUidStore* pStore);
|
||||
void* tdUidStoreFree(STbUidStore* pStore);
|
||||
|
||||
#if 0
|
||||
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
|
||||
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
|
||||
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
|
||||
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
|
||||
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
|
||||
void tsdbUidStoreDestory(STbUidStore* pStore);
|
||||
void* tsdbUidStoreFree(STbUidStore* pStore);
|
||||
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
int8_t streamType; // sma or other
|
||||
int8_t dstType;
|
||||
|
|
|
@ -58,6 +58,9 @@ typedef struct {
|
|||
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
|
||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||
|
||||
static int32_t tsdbCommitData(SCommitH *pCommith);
|
||||
static int32_t tsdbCommitDel(SCommitH *pCommith);
|
||||
static int32_t tsdbCommitCache(SCommitH *pCommith);
|
||||
static void tsdbStartCommit(STsdb *pRepo);
|
||||
static void tsdbEndCommit(STsdb *pTsdb, int eno);
|
||||
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
|
||||
|
@ -89,7 +92,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
|
|||
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
||||
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
||||
|
||||
int tsdbBegin(STsdb *pTsdb) {
|
||||
int32_t tsdbBegin(STsdb *pTsdb) {
|
||||
if (!pTsdb) return 0;
|
||||
|
||||
SMemTable *pMem;
|
||||
|
@ -117,10 +120,42 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// commit impl
|
||||
code = tsdbCommitData(&commith);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbCommitDel(&commith);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbCommitCache(&commith);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// end commit
|
||||
tsdbDestroyCommitH(&commith);
|
||||
tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitData(SCommitH *pCommith) {
|
||||
int32_t fid;
|
||||
SDFileSet *pSet;
|
||||
int32_t code = 0;
|
||||
STsdb *pTsdb = TSDB_COMMIT_REPO(pCommith);
|
||||
|
||||
// Skip expired memory data and expired FSET
|
||||
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
|
||||
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
|
||||
if (pSet->fid < commith.rtn.minFid) {
|
||||
tsdbSeekCommitIter(pCommith, pCommith->rtn.minKey);
|
||||
while ((pSet = tsdbFSIterNext(&(pCommith->fsIter)))) {
|
||||
if (pSet->fid < pCommith->rtn.minFid) {
|
||||
tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid,
|
||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||
} else {
|
||||
|
@ -129,7 +164,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
|||
}
|
||||
|
||||
// commit
|
||||
fid = tsdbNextCommitFid(&(commith));
|
||||
fid = tsdbNextCommitFid(pCommith);
|
||||
while (true) {
|
||||
// Loop over both on disk and memory
|
||||
if (pSet == NULL && fid == TSDB_IVLD_FID) break;
|
||||
|
@ -137,12 +172,12 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
|||
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
|
||||
// Only has existing FSET but no memory data to commit in this
|
||||
// existing FSET, only check if file in correct retention
|
||||
if (tsdbApplyRtnOnFSet(pTsdb, pSet, &(commith.rtn)) < 0) {
|
||||
tsdbDestroyCommitH(&commith);
|
||||
if (tsdbApplyRtnOnFSet(TSDB_COMMIT_REPO(pCommith), pSet, &(pCommith->rtn)) < 0) {
|
||||
tsdbDestroyCommitH(pCommith);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pSet = tsdbFSIterNext(&(commith.fsIter));
|
||||
pSet = tsdbFSIterNext(&(pCommith->fsIter));
|
||||
} else {
|
||||
// Has memory data to commit
|
||||
SDFileSet *pCSet;
|
||||
|
@ -156,22 +191,30 @@ int32_t tsdbCommit(STsdb *pTsdb) {
|
|||
// Commit to an existing FSET
|
||||
pCSet = pSet;
|
||||
cfid = pSet->fid;
|
||||
pSet = tsdbFSIterNext(&(commith.fsIter));
|
||||
pSet = tsdbFSIterNext(&(pCommith->fsIter));
|
||||
}
|
||||
|
||||
if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) {
|
||||
tsdbDestroyCommitH(&commith);
|
||||
if (tsdbCommitToFile(pCommith, pCSet, cfid) < 0) {
|
||||
tsdbDestroyCommitH(pCommith);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fid = tsdbNextCommitFid(&commith);
|
||||
fid = tsdbNextCommitFid(pCommith);
|
||||
}
|
||||
}
|
||||
|
||||
// end commit
|
||||
tsdbDestroyCommitH(&commith);
|
||||
tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitDel(SCommitH *pCommith) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitCache(SCommitH *pCommith) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -216,16 +259,6 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// int tsdbPrepareCommit(STsdb *pTsdb) {
|
||||
// if (pTsdb->mem == NULL) return 0;
|
||||
|
||||
// ASSERT(pTsdb->imem == NULL);
|
||||
|
||||
// pTsdb->imem = pTsdb->mem;
|
||||
// pTsdb->mem = NULL;
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
|
||||
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
|
||||
TSKEY minKey, midKey, maxKey, now;
|
||||
|
|
|
@ -1,530 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
|
||||
typedef struct {
|
||||
tb_uid_t uid;
|
||||
STSchema *pTSchema;
|
||||
} SSkmInfo;
|
||||
|
||||
#define SL_MAX_LEVEL 5
|
||||
|
||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
|
||||
|
||||
#define SL_MOVE_BACKWARD 0x1
|
||||
#define SL_MOVE_FROM_POS 0x2
|
||||
|
||||
static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
|
||||
static int memDataPCmprFn(const void *p1, const void *p2);
|
||||
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
||||
static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version,
|
||||
SVSubmitBlk *pSubmitBlk);
|
||||
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
|
||||
|
||||
// SMemTable ==============================================
|
||||
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable) {
|
||||
int32_t code = 0;
|
||||
SMemTable2 *pMemTable = NULL;
|
||||
|
||||
pMemTable = (SMemTable2 *)taosMemoryCalloc(1, sizeof(*pMemTable));
|
||||
if (pMemTable == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pMemTable->pTsdb = pTsdb;
|
||||
pMemTable->nRef = 1;
|
||||
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
||||
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
||||
pMemTable->nRows = 0;
|
||||
pMemTable->nDelOp = 0;
|
||||
pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *));
|
||||
if (pMemTable->aMemData == NULL) {
|
||||
taosMemoryFree(pMemTable);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
*ppMemTable = pMemTable;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
*ppMemTable = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbMemTableDestroy2(SMemTable2 *pMemTable) {
|
||||
taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/);
|
||||
taosMemoryFree(pMemTable);
|
||||
}
|
||||
|
||||
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk) {
|
||||
int32_t code = 0;
|
||||
SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO
|
||||
SMemData *pMemData;
|
||||
TSDBROW row = {.version = version};
|
||||
|
||||
ASSERT(pMemTable);
|
||||
ASSERT(pSubmitBlk->nData > 0);
|
||||
|
||||
{
|
||||
// check if table exists (todo)
|
||||
}
|
||||
|
||||
code = tsdbGetOrCreateMemData(pMemTable, pSubmitBlk->suid, pSubmitBlk->uid, &pMemData);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, failed to create/get table data since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// do insert
|
||||
code = tsdbInsertTableDataImpl(pMemTable, pMemData, version, pSubmitBlk);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO
|
||||
SMemData *pMemData;
|
||||
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
||||
|
||||
ASSERT(pMemTable);
|
||||
|
||||
{
|
||||
// check if table exists (todo)
|
||||
}
|
||||
|
||||
code = tsdbGetOrCreateMemData(pMemTable, suid, uid, &pMemData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// do delete
|
||||
SDelOp *pDelOp = (SDelOp *)vnodeBufPoolMalloc(pPool, sizeof(*pDelOp));
|
||||
if (pDelOp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pDelOp->version = version;
|
||||
pDelOp->sKey = sKey;
|
||||
pDelOp->eKey = eKey;
|
||||
pDelOp->pNext = NULL;
|
||||
if (pMemData->delOpHead == NULL) {
|
||||
ASSERT(pMemData->delOpTail == NULL);
|
||||
pMemData->delOpHead = pMemData->delOpTail = pDelOp;
|
||||
} else {
|
||||
pMemData->delOpTail->pNext = pDelOp;
|
||||
pMemData->delOpTail = pDelOp;
|
||||
}
|
||||
|
||||
{
|
||||
// update the state of pMemTable, pMemData, last and lastrow (todo)
|
||||
}
|
||||
|
||||
pMemTable->nDelOp++;
|
||||
|
||||
tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
|
||||
" since %s",
|
||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
|
||||
" since %s",
|
||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter) {
|
||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||
|
||||
pIter->pMemData = pMemData;
|
||||
pIter->backward = backward;
|
||||
pIter->pRow = NULL;
|
||||
if (pKey == NULL) {
|
||||
// create from head or tail
|
||||
if (backward) {
|
||||
pIter->pNode = SL_NODE_BACKWARD(pMemData->sl.pTail, 0);
|
||||
} else {
|
||||
pIter->pNode = SL_NODE_FORWARD(pMemData->sl.pHead, 0);
|
||||
}
|
||||
} else {
|
||||
// create from a key
|
||||
if (backward) {
|
||||
memDataMovePosTo(pMemData, pos, pKey, SL_MOVE_BACKWARD);
|
||||
pIter->pNode = SL_NODE_BACKWARD(pos[0], 0);
|
||||
} else {
|
||||
memDataMovePosTo(pMemData, pos, pKey, 0);
|
||||
pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool tsdbMemDataIterNext(SMemDataIter *pIter) {
|
||||
SMemSkipListNode *pHead = pIter->pMemData->sl.pHead;
|
||||
SMemSkipListNode *pTail = pIter->pMemData->sl.pTail;
|
||||
|
||||
pIter->pRow = NULL;
|
||||
if (pIter->backward) {
|
||||
ASSERT(pIter->pNode != pTail);
|
||||
|
||||
if (pIter->pNode == pHead) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
|
||||
if (pIter->pNode == pHead) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pIter->pNode != pHead);
|
||||
|
||||
if (pIter->pNode == pTail) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
|
||||
if (pIter->pNode == pTail) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) {
|
||||
if (pIter->pRow) {
|
||||
*ppRow = pIter->pRow;
|
||||
} else {
|
||||
SMemSkipListNode *pHead = pIter->pMemData->sl.pHead;
|
||||
SMemSkipListNode *pTail = pIter->pMemData->sl.pTail;
|
||||
|
||||
if (pIter->backward) {
|
||||
ASSERT(pIter->pNode != pTail);
|
||||
|
||||
if (pIter->pNode == pHead) {
|
||||
*ppRow = NULL;
|
||||
} else {
|
||||
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
|
||||
*ppRow = &pIter->row;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pIter->pNode != pHead);
|
||||
|
||||
if (pIter->pNode == pTail) {
|
||||
*ppRow = NULL;
|
||||
} else {
|
||||
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
|
||||
*ppRow = &pIter->row;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) {
|
||||
int32_t code = 0;
|
||||
int32_t idx = 0;
|
||||
SMemData *pMemDataT = &(SMemData){.suid = suid, .uid = uid};
|
||||
SMemData *pMemData = NULL;
|
||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
|
||||
|
||||
// get
|
||||
idx = taosArraySearchIdx(pMemTable->aMemData, &pMemDataT, memDataPCmprFn, TD_GE);
|
||||
if (idx >= 0) {
|
||||
pMemData = (SMemData *)taosArrayGet(pMemTable->aMemData, idx);
|
||||
if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit;
|
||||
}
|
||||
|
||||
// create
|
||||
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_SIZE(maxLevel) * 2);
|
||||
if (pMemData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pMemData->suid = suid;
|
||||
pMemData->uid = uid;
|
||||
pMemData->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
||||
pMemData->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
||||
pMemData->delOpHead = pMemData->delOpTail = NULL;
|
||||
pMemData->sl.seed = taosRand();
|
||||
pMemData->sl.size = 0;
|
||||
pMemData->sl.maxLevel = maxLevel;
|
||||
pMemData->sl.level = 0;
|
||||
pMemData->sl.pHead = (SMemSkipListNode *)&pMemData[1];
|
||||
pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_SIZE(maxLevel));
|
||||
pMemData->sl.pHead->level = maxLevel;
|
||||
pMemData->sl.pTail->level = maxLevel;
|
||||
|
||||
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
|
||||
SL_NODE_FORWARD(pMemData->sl.pHead, iLevel) = pMemData->sl.pTail;
|
||||
SL_NODE_BACKWARD(pMemData->sl.pHead, iLevel) = NULL;
|
||||
SL_NODE_BACKWARD(pMemData->sl.pTail, iLevel) = pMemData->sl.pHead;
|
||||
SL_NODE_FORWARD(pMemData->sl.pTail, iLevel) = NULL;
|
||||
}
|
||||
|
||||
if (idx < 0) idx = 0;
|
||||
if (taosArrayInsert(pMemTable->aMemData, idx, &pMemData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_exit:
|
||||
*ppMemData = pMemData;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
*ppMemData = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int memDataPCmprFn(const void *p1, const void *p2) {
|
||||
SMemData *pMemData1 = *(SMemData **)p1;
|
||||
SMemData *pMemData2 = *(SMemData **)p2;
|
||||
|
||||
if (pMemData1->suid < pMemData2->suid) {
|
||||
return -1;
|
||||
} else if (pMemData1->suid > pMemData2->suid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (pMemData1->uid < pMemData2->uid) {
|
||||
return -1;
|
||||
} else if (pMemData1->uid > pMemData2->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI64(p ? p + n : p, pRow->version);
|
||||
n += tPutTSRow(p ? p + n : p, &pRow->tsRow);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetI64(p + n, &pRow->version);
|
||||
n += tGetTSRow(p + n, &pRow->tsRow);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
||||
int8_t level = 1;
|
||||
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
|
||||
const uint32_t factor = 4;
|
||||
|
||||
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
|
||||
level++;
|
||||
}
|
||||
|
||||
return level;
|
||||
}
|
||||
|
||||
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
|
||||
SMemSkipListNode *px;
|
||||
SMemSkipListNode *pn;
|
||||
TSDBKEY *pTKey;
|
||||
int c;
|
||||
int backward = flags & SL_MOVE_BACKWARD;
|
||||
int fromPos = flags & SL_MOVE_FROM_POS;
|
||||
|
||||
if (backward) {
|
||||
px = pMemData->sl.pTail;
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) {
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
|
||||
if (pMemData->sl.level) {
|
||||
if (fromPos) px = pos[pMemData->sl.level - 1];
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||
pn = SL_NODE_BACKWARD(px, iLevel);
|
||||
while (pn != pMemData->sl.pHead) {
|
||||
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
|
||||
|
||||
c = tsdbKeyCmprFn(pTKey, pKey);
|
||||
if (c <= 0) {
|
||||
break;
|
||||
} else {
|
||||
px = pn;
|
||||
pn = SL_NODE_BACKWARD(px, iLevel);
|
||||
}
|
||||
}
|
||||
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
px = pMemData->sl.pHead;
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) {
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
|
||||
if (pMemData->sl.level) {
|
||||
if (fromPos) px = pos[pMemData->sl.level - 1];
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||
pn = SL_NODE_FORWARD(px, iLevel);
|
||||
while (pn != pMemData->sl.pHead) {
|
||||
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
|
||||
|
||||
c = tsdbKeyCmprFn(pTKey, pKey);
|
||||
if (c >= 0) {
|
||||
break;
|
||||
} else {
|
||||
px = pn;
|
||||
pn = SL_NODE_FORWARD(px, iLevel);
|
||||
}
|
||||
}
|
||||
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t memDataDoPut(SMemTable2 *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow,
|
||||
int8_t forward) {
|
||||
int32_t code = 0;
|
||||
int8_t level;
|
||||
SMemSkipListNode *pNode;
|
||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||
|
||||
// node
|
||||
level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
|
||||
if (pNode == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
pNode->level = level;
|
||||
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
|
||||
SL_NODE_FORWARD(pNode, iLevel) = NULL;
|
||||
SL_NODE_BACKWARD(pNode, iLevel) = NULL;
|
||||
}
|
||||
|
||||
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
|
||||
|
||||
// put
|
||||
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
|
||||
SMemSkipListNode *px = pos[iLevel];
|
||||
|
||||
if (forward) {
|
||||
SMemSkipListNode *pNext = SL_NODE_FORWARD(px, iLevel);
|
||||
|
||||
SL_NODE_FORWARD(pNode, iLevel) = pNext;
|
||||
SL_NODE_BACKWARD(pNode, iLevel) = px;
|
||||
|
||||
SL_NODE_BACKWARD(pNext, iLevel) = pNode;
|
||||
SL_NODE_FORWARD(px, iLevel) = pNode;
|
||||
} else {
|
||||
SMemSkipListNode *pPrev = SL_NODE_BACKWARD(px, iLevel);
|
||||
|
||||
SL_NODE_FORWARD(pNode, iLevel) = px;
|
||||
SL_NODE_BACKWARD(pNode, iLevel) = pPrev;
|
||||
|
||||
SL_NODE_FORWARD(pPrev, iLevel) = pNode;
|
||||
SL_NODE_BACKWARD(px, iLevel) = pNode;
|
||||
}
|
||||
}
|
||||
|
||||
pMemData->sl.size++;
|
||||
if (pMemData->sl.level < pNode->level) {
|
||||
pMemData->sl.level = pNode->level;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version,
|
||||
SVSubmitBlk *pSubmitBlk) {
|
||||
int32_t code = 0;
|
||||
int32_t n = 0;
|
||||
uint8_t *p = pSubmitBlk->pData;
|
||||
int32_t nRow = 0;
|
||||
TSDBROW row = {.version = version};
|
||||
|
||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||
|
||||
ASSERT(pSubmitBlk->nData);
|
||||
|
||||
// backward put first data
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD);
|
||||
code = memDataDoPut(pMemTable, pMemData, pos, &row, 0);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
nRow++;
|
||||
|
||||
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->minKey) < 0) {
|
||||
pMemData->minKey = *(TSDBKEY *)&row;
|
||||
}
|
||||
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->minKey) < 0) {
|
||||
pMemTable->minKey = *(TSDBKEY *)&row;
|
||||
}
|
||||
|
||||
// forward put rest
|
||||
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
|
||||
pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
|
||||
}
|
||||
while (n < pSubmitBlk->nData) {
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS);
|
||||
code = memDataDoPut(pMemTable, pMemData, pos, &row, 1);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
nRow++;
|
||||
}
|
||||
|
||||
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->maxKey) > 0) {
|
||||
pMemData->maxKey = *(TSDBKEY *)&row;
|
||||
}
|
||||
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->maxKey) > 0) {
|
||||
pMemTable->maxKey = *(TSDBKEY *)&row;
|
||||
}
|
||||
pMemTable->nRows += nRow;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue