Merge pull request #18064 from taosdata/perf/insert_optimize
perf: insert optimize
This commit is contained in:
commit
93447107e1
|
@ -61,14 +61,14 @@ struct SVBufPoolNode {
|
|||
};
|
||||
|
||||
struct SVBufPool {
|
||||
SVBufPool* next;
|
||||
SVnode* pVnode;
|
||||
volatile int32_t nRef;
|
||||
TdThreadSpinlock lock;
|
||||
int64_t size;
|
||||
uint8_t* ptr;
|
||||
SVBufPoolNode* pTail;
|
||||
SVBufPoolNode node;
|
||||
SVBufPool* next;
|
||||
SVnode* pVnode;
|
||||
TdThreadSpinlock* lock;
|
||||
volatile int32_t nRef;
|
||||
int64_t size;
|
||||
uint8_t* ptr;
|
||||
SVBufPoolNode* pTail;
|
||||
SVBufPoolNode node;
|
||||
};
|
||||
|
||||
int32_t vnodeOpenBufPool(SVnode* pVnode);
|
||||
|
|
|
@ -458,11 +458,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
|||
}
|
||||
|
||||
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
|
||||
int8_t level = 1;
|
||||
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
|
||||
const uint32_t factor = 4;
|
||||
int8_t level = 1;
|
||||
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
|
||||
|
||||
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
|
||||
while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) {
|
||||
level++;
|
||||
}
|
||||
|
||||
|
|
|
@ -1015,6 +1015,191 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
SColVal cv = {0};
|
||||
int32_t iColDataFrom = 0;
|
||||
SColData *pColDataFrom =
|
||||
(iColDataFrom < pBlockDataFrom->nColData) ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] : NULL;
|
||||
|
||||
for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) {
|
||||
SColData *pColDataTo = &((SColData *)pBlockData->aColData->pData)[iColDataTo];
|
||||
|
||||
while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) {
|
||||
iColDataFrom++;
|
||||
pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData)
|
||||
? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom]
|
||||
: NULL;
|
||||
}
|
||||
|
||||
if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
|
||||
code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type));
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
tColDataGetValue(pColDataFrom, iRow, &cv);
|
||||
|
||||
code = tColDataAppendValue(pColDataTo, &cv);
|
||||
if (code) goto _exit;
|
||||
|
||||
iColDataFrom++;
|
||||
pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData)
|
||||
? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom]
|
||||
: NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t iTColumn = 1;
|
||||
STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
void *pBitmap = pRow->statis ? tdGetBitmapAddrTp(pRow, pTSchema->flen) : NULL;
|
||||
|
||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
||||
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
|
||||
|
||||
while (pTColumn && pTColumn->colId < pColData->cid) {
|
||||
iTColumn++;
|
||||
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
}
|
||||
|
||||
if (pTColumn == NULL || pTColumn->colId > pColData->cid) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
ASSERT(pTColumn->type == pColData->type);
|
||||
|
||||
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
|
||||
|
||||
if (pRow->statis) {
|
||||
TDRowValT vt = TD_VTYPE_MAX;
|
||||
tdGetBitmapValTypeII(pBitmap, iTColumn - 1, &vt);
|
||||
|
||||
if (vt == TD_VTYPE_NORM) {
|
||||
cv.flag = CV_FLAG_VALUE;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
|
||||
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
cv.value.nData = varDataLen(pData);
|
||||
cv.value.pData = varDataVal(pData);
|
||||
} else {
|
||||
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
|
||||
}
|
||||
|
||||
code = tColDataAppendValue(pColData, &cv);
|
||||
if (code) goto _exit;
|
||||
} else if (vt == TD_VTYPE_NONE) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else if (vt == TD_VTYPE_NULL) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
} else {
|
||||
cv.flag = CV_FLAG_VALUE;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
|
||||
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
|
||||
cv.value.nData = varDataLen(pData);
|
||||
cv.value.pData = varDataVal(pData);
|
||||
} else {
|
||||
memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes);
|
||||
}
|
||||
|
||||
code = tColDataAppendValue(pColData, &cv);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
iTColumn++;
|
||||
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) {
|
||||
int32_t code = 0;
|
||||
|
||||
col_id_t kvIter = 0;
|
||||
col_id_t nKvCols = tdRowGetNCols(pRow) - 1;
|
||||
void *pColIdx = TD_ROW_COL_IDX(pRow);
|
||||
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
|
||||
int32_t iTColumn = 1;
|
||||
STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
|
||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
||||
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
|
||||
|
||||
while (pTColumn && pTColumn->colId < pColData->cid) {
|
||||
iTColumn++;
|
||||
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
}
|
||||
|
||||
if (pTColumn == NULL || pTColumn->colId > pColData->cid) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
ASSERT(pTColumn->type == pColData->type);
|
||||
|
||||
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
|
||||
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
|
||||
SKvRowIdx *pKvIdx = NULL;
|
||||
|
||||
while (kvIter < nKvCols) {
|
||||
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pColIdx, kvIter * sizeof(SKvRowIdx));
|
||||
if (pKvIdx->colId == pTColumn->colId) {
|
||||
tdGetBitmapValTypeII(pBitmap, kvIter, &vt);
|
||||
++kvIter;
|
||||
break;
|
||||
} else if (pKvIdx->colId > pTColumn->colId) {
|
||||
vt = TD_VTYPE_NONE;
|
||||
break;
|
||||
} else {
|
||||
++kvIter;
|
||||
}
|
||||
}
|
||||
|
||||
if (vt == TD_VTYPE_NORM) {
|
||||
cv.flag = CV_FLAG_VALUE;
|
||||
|
||||
void *pData = POINTER_SHIFT(pRow, pKvIdx->offset);
|
||||
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
|
||||
cv.value.nData = varDataLen(pData);
|
||||
cv.value.pData = varDataVal(pData);
|
||||
} else {
|
||||
memcpy(&cv.value.val, pData, pTColumn->bytes);
|
||||
}
|
||||
|
||||
code = tColDataAppendValue(pColData, &cv);
|
||||
if (code) goto _exit;
|
||||
} else if (vt == TD_VTYPE_NONE) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else if (vt == TD_VTYPE_NULL) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type));
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
iTColumn++;
|
||||
pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -1036,27 +1221,20 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
|
|||
if (code) goto _err;
|
||||
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
|
||||
|
||||
// OTHER
|
||||
SRowIter rIter = {0};
|
||||
SColVal *pColVal;
|
||||
|
||||
tRowIterInit(&rIter, pRow, pTSchema);
|
||||
pColVal = tRowIterNext(&rIter);
|
||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
||||
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
|
||||
|
||||
while (pColVal && pColVal->cid < pColData->cid) {
|
||||
pColVal = tRowIterNext(&rIter);
|
||||
}
|
||||
|
||||
if (pColVal == NULL || pColVal->cid > pColData->cid) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
|
||||
SColVal cv = {0};
|
||||
if (pRow->type == 0) {
|
||||
if (TD_IS_TP_ROW(pRow->pTSRow)) {
|
||||
code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema);
|
||||
if (code) goto _err;
|
||||
} else if (TD_IS_KV_ROW(pRow->pTSRow)) {
|
||||
code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
code = tColDataAppendValue(pColData, pColVal);
|
||||
if (code) goto _err;
|
||||
pColVal = tRowIterNext(&rIter);
|
||||
ASSERT(0);
|
||||
}
|
||||
} else {
|
||||
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
|
||||
if (code) goto _err;
|
||||
}
|
||||
pBlockData->nRow++;
|
||||
|
||||
|
|
|
@ -27,10 +27,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
|
||||
if (!pPool->lock) {
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (taosThreadSpinInit(pPool->lock, 0) != 0) {
|
||||
taosMemoryFree((void*)pPool->lock);
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
pPool->lock = NULL;
|
||||
}
|
||||
|
||||
pPool->next = NULL;
|
||||
|
@ -49,7 +60,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
|
|||
|
||||
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||
vnodeBufPoolReset(pPool);
|
||||
taosThreadSpinDestroy(&pPool->lock);
|
||||
if (pPool->lock) {
|
||||
taosThreadSpinDestroy(pPool->lock);
|
||||
taosMemoryFree((void*)pPool->lock);
|
||||
}
|
||||
taosMemoryFree(pPool);
|
||||
return 0;
|
||||
}
|
||||
|
@ -114,7 +128,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|||
void *p = NULL;
|
||||
ASSERT(pPool != NULL);
|
||||
|
||||
taosThreadSpinLock(&pPool->lock);
|
||||
if (pPool->lock) taosThreadSpinLock(pPool->lock);
|
||||
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
|
||||
// allocate from the anchor node
|
||||
p = pPool->ptr;
|
||||
|
@ -125,7 +139,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|||
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosThreadSpinUnlock(&pPool->lock);
|
||||
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -138,7 +152,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|||
|
||||
pPool->size = pPool->size + sizeof(*pNode) + size;
|
||||
}
|
||||
taosThreadSpinUnlock(&pPool->lock);
|
||||
if (pPool->lock) taosThreadSpinUnlock(pPool->lock);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue