more
This commit is contained in:
parent
e0428e45a4
commit
a0aa003f89
|
@ -67,8 +67,6 @@ static int memDataPCmprFn(const void *p1, const void *p2);
|
|||
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
|
||||
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
|
||||
static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward);
|
||||
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
|
||||
SVSubmitBlk *pSubmitBlk);
|
||||
|
||||
|
@ -295,48 +293,6 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
|||
return level;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
|
||||
SVSubmitBlk *pSubmitBlk) {
|
||||
int32_t code = 0;
|
||||
int32_t n = 0;
|
||||
uint8_t *p = pSubmitBlk->pData;
|
||||
int32_t nRow = 0;
|
||||
TSDBROW row = {.version = version};
|
||||
|
||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||
|
||||
ASSERT(pSubmitBlk->nData);
|
||||
|
||||
// backward put first data
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD);
|
||||
code = memDataDoPut(pMemData, pos, &row, 0);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
nRow++;
|
||||
|
||||
// forward put rest
|
||||
while (n < pSubmitBlk->nData) {
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
// TODO
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS);
|
||||
code = memDataDoPut(pMemData, pos, &row, 1);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
nRow++;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
|
||||
SMemSkipListNode *px;
|
||||
SMemSkipListNode *pn;
|
||||
|
@ -402,8 +358,82 @@ static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
static int32_t memDataDoPut(SMemTable *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow,
|
||||
int8_t forward) {
|
||||
int32_t code = 0;
|
||||
int8_t level;
|
||||
SMemSkipListNode *pNode;
|
||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||
|
||||
// node
|
||||
level = tsdbMemSkipListRandLevel(&pMemData->sl);
|
||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
|
||||
if (pNode == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
pNode->level = level;
|
||||
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
|
||||
SL_NODE_FORWARD(pNode, iLevel) = NULL;
|
||||
SL_NODE_BACKWARD(pNode, iLevel) = NULL;
|
||||
}
|
||||
|
||||
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
|
||||
|
||||
// put
|
||||
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
|
||||
/* code */
|
||||
}
|
||||
|
||||
pMemData->sl.size++;
|
||||
if (pMemData->sl.level < pNode->level) {
|
||||
pMemData->sl.level = pNode->level;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
|
||||
SVSubmitBlk *pSubmitBlk) {
|
||||
int32_t code = 0;
|
||||
int32_t n = 0;
|
||||
uint8_t *p = pSubmitBlk->pData;
|
||||
int32_t nRow = 0;
|
||||
TSDBROW row = {.version = version};
|
||||
|
||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||
|
||||
ASSERT(pSubmitBlk->nData);
|
||||
|
||||
// backward put first data
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD);
|
||||
code = memDataDoPut(pMemTable, pMemData, pos, &row, 0);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
nRow++;
|
||||
|
||||
// forward put rest
|
||||
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
|
||||
pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
|
||||
}
|
||||
while (n < pSubmitBlk->nData) {
|
||||
n += tGetTSRow(p + n, &row.tsRow);
|
||||
ASSERT(n <= pSubmitBlk->nData);
|
||||
|
||||
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS);
|
||||
code = memDataDoPut(pMemTable, pMemData, pos, &row, 1);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
nRow++;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue