Merge pull request #18130 from taosdata/perf/insert_optimize
perf: insert optimize
This commit is contained in:
commit
f5f1bae376
|
@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
|
|||
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
|
||||
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
|
||||
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
|
||||
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||
// SRowIter
|
||||
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||
|
@ -333,6 +333,8 @@ struct SVersionRange {
|
|||
typedef struct SMemSkipListNode SMemSkipListNode;
|
||||
struct SMemSkipListNode {
|
||||
int8_t level;
|
||||
int64_t version;
|
||||
STSRow *pTSRow;
|
||||
SMemSkipListNode *forwards[0];
|
||||
};
|
||||
typedef struct SMemSkipList {
|
||||
|
@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
|||
|
||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
|
||||
|
||||
static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||
int32_t n = tGetI64(p, &pRow->version);
|
||||
pRow->pTSRow = (STSRow *)(p + n);
|
||||
n += pRow->pTSRow->len;
|
||||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
||||
if (pIter == NULL) return NULL;
|
||||
|
@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
|||
}
|
||||
}
|
||||
|
||||
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
|
||||
pIter->pRow = &pIter->row;
|
||||
pIter->pRow->version = pIter->pNode->version;
|
||||
pIter->pRow->pTSRow = pIter->pNode->pTSRow;
|
||||
|
||||
return pIter->pRow;
|
||||
}
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
#define MEM_MIN_HASH 1024
|
||||
#define SL_MAX_LEVEL 5
|
||||
|
||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
||||
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
|
||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
|
||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
|
||||
|
||||
#define SL_MOVE_BACKWARD 0x1
|
||||
#define SL_MOVE_FROM_POS 0x2
|
||||
|
@ -263,30 +263,27 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
|
|||
}
|
||||
|
||||
bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||
SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
|
||||
SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;
|
||||
|
||||
pIter->pRow = NULL;
|
||||
if (pIter->backward) {
|
||||
ASSERT(pIter->pNode != pTail);
|
||||
ASSERT(pIter->pNode != pIter->pTbData->sl.pTail);
|
||||
|
||||
if (pIter->pNode == pHead) {
|
||||
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
|
||||
if (pIter->pNode == pHead) {
|
||||
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pIter->pNode != pHead);
|
||||
ASSERT(pIter->pNode != pIter->pTbData->sl.pHead);
|
||||
|
||||
if (pIter->pNode == pTail) {
|
||||
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
|
||||
if (pIter->pNode == pTail) {
|
||||
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -394,7 +391,7 @@ _err:
|
|||
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
|
||||
SMemSkipListNode *px;
|
||||
SMemSkipListNode *pn;
|
||||
TSDBKEY *pTKey;
|
||||
TSDBKEY tKey = {0};
|
||||
int32_t backward = flags & SL_MOVE_BACKWARD;
|
||||
int32_t fromPos = flags & SL_MOVE_FROM_POS;
|
||||
|
||||
|
@ -413,9 +410,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
|||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||
pn = SL_NODE_BACKWARD(px, iLevel);
|
||||
while (pn != pTbData->sl.pHead) {
|
||||
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
|
||||
tKey.version = pn->version;
|
||||
tKey.ts = pn->pTSRow->ts;
|
||||
|
||||
int32_t c = tsdbKeyCmprFn(pTKey, pKey);
|
||||
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
|
||||
if (c <= 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -442,7 +440,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
|||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||
pn = SL_NODE_FORWARD(px, iLevel);
|
||||
while (pn != pTbData->sl.pTail) {
|
||||
int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey);
|
||||
tKey.version = pn->version;
|
||||
tKey.ts = pn->pTSRow->ts;
|
||||
|
||||
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
|
||||
if (c >= 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -467,8 +468,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
|||
|
||||
return level;
|
||||
}
|
||||
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
|
||||
int8_t forward) {
|
||||
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version,
|
||||
STSRow *pRow, int8_t forward) {
|
||||
int32_t code = 0;
|
||||
int8_t level;
|
||||
SMemSkipListNode *pNode;
|
||||
|
@ -477,13 +478,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
|
|||
// node
|
||||
level = tsdbMemSkipListRandLevel(&pTbData->sl);
|
||||
ASSERT(pPool != NULL);
|
||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
|
||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
|
||||
if (pNode == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
pNode->level = level;
|
||||
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
|
||||
pNode->version = version;
|
||||
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
|
||||
if (NULL == pNode->pTSRow) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
memcpy(pNode->pTSRow, pRow, pRow->len);
|
||||
|
||||
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
|
||||
SMemSkipListNode *pn = pos[iLevel];
|
||||
|
@ -549,7 +556,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
key.ts = row.pTSRow->ts;
|
||||
nRow++;
|
||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
|
||||
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0);
|
||||
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -570,7 +577,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
|
||||
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
|
||||
}
|
||||
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
|
||||
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||
int32_t n = 0;
|
||||
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||
// int32_t n = 0;
|
||||
|
||||
n += tPutI64(p, pRow->version);
|
||||
if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
|
||||
n += pRow->pTSRow->len;
|
||||
// n += tPutI64(p, pRow->version);
|
||||
// if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
|
||||
// n += pRow->pTSRow->len;
|
||||
|
||||
return n;
|
||||
}
|
||||
// return n;
|
||||
// }
|
||||
|
||||
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
|
||||
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
|
||||
|
@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
|
|||
cv.flag = CV_FLAG_VALUE;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
|
||||
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
cv.value.nData = varDataLen(pData);
|
||||
cv.value.pData = varDataVal(pData);
|
||||
} else {
|
||||
|
@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
|
|||
cv.flag = CV_FLAG_VALUE;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
|
||||
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
cv.value.nData = varDataLen(pData);
|
||||
cv.value.pData = varDataVal(pData);
|
||||
} else {
|
||||
|
@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch
|
|||
ASSERT(pTColumn->type == pColData->type);
|
||||
|
||||
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
|
||||
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
|
||||
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
|
||||
SKvRowIdx *pKvIdx = NULL;
|
||||
|
||||
while (kvIter < nKvCols) {
|
||||
|
|
Loading…
Reference in New Issue