Merge pull request #17988 from taosdata/perf/default_buffer_size
perf: optimize write by changing default buffer size
This commit is contained in:
commit
b882e45505
|
@ -290,7 +290,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DEFAULT_VN_PER_DB 2
|
#define TSDB_DEFAULT_VN_PER_DB 2
|
||||||
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
|
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
|
||||||
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
|
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
|
||||||
#define TSDB_DEFAULT_BUFFER_PER_VNODE 96
|
#define TSDB_DEFAULT_BUFFER_PER_VNODE 256
|
||||||
#define TSDB_MIN_PAGES_PER_VNODE 64
|
#define TSDB_MIN_PAGES_PER_VNODE 64
|
||||||
#define TSDB_MAX_PAGES_PER_VNODE (INT32_MAX - 1)
|
#define TSDB_MAX_PAGES_PER_VNODE (INT32_MAX - 1)
|
||||||
#define TSDB_DEFAULT_PAGES_PER_VNODE 256
|
#define TSDB_DEFAULT_PAGES_PER_VNODE 256
|
||||||
|
|
|
@ -110,7 +110,6 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
|
||||||
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
|
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
|
||||||
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||||
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||||
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
|
||||||
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||||
// SRowIter
|
// SRowIter
|
||||||
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
@ -210,11 +209,10 @@ void tsdbRefMemTable(SMemTable *pMemTable);
|
||||||
void tsdbUnrefMemTable(SMemTable *pMemTable);
|
void tsdbUnrefMemTable(SMemTable *pMemTable);
|
||||||
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
||||||
// STbDataIter
|
// STbDataIter
|
||||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
||||||
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
|
||||||
// STbData
|
// STbData
|
||||||
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
||||||
// tsdbFile.c ==============================================================================================
|
// tsdbFile.c ==============================================================================================
|
||||||
|
@ -772,6 +770,40 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||||
|
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||||
|
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||||
|
int32_t n = tGetI64(p, &pRow->version);
|
||||||
|
pRow->pTSRow = (STSRow *)(p + n);
|
||||||
|
n += pRow->pTSRow->len;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
||||||
|
if (pIter == NULL) return NULL;
|
||||||
|
|
||||||
|
if (pIter->pRow) {
|
||||||
|
return pIter->pRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->backward) {
|
||||||
|
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
|
||||||
|
pIter->pRow = &pIter->row;
|
||||||
|
|
||||||
|
return pIter->pRow;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -294,31 +294,6 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
|
||||||
// we add here for commit usage
|
|
||||||
if (pIter == NULL) return NULL;
|
|
||||||
|
|
||||||
if (pIter->pRow) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->backward) {
|
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
|
|
||||||
pIter->pRow = &pIter->row;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return pIter->pRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
|
|
@ -575,16 +575,6 @@ int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
|
|
||||||
int32_t n = 0;
|
|
||||||
|
|
||||||
n += tGetI64(p, &pRow->version);
|
|
||||||
pRow->pTSRow = (STSRow *)(p + n);
|
|
||||||
n += pRow->pTSRow->len;
|
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
|
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
|
||||||
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
|
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
|
||||||
}
|
}
|
||||||
|
@ -1053,7 +1043,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
|
||||||
tRowIterInit(&rIter, pRow, pTSchema);
|
tRowIterInit(&rIter, pRow, pTSchema);
|
||||||
pColVal = tRowIterNext(&rIter);
|
pColVal = tRowIterNext(&rIter);
|
||||||
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
|
||||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData];
|
||||||
|
|
||||||
while (pColVal && pColVal->cid < pColData->cid) {
|
while (pColVal && pColVal->cid < pColData->cid) {
|
||||||
pColVal = tRowIterNext(&rIter);
|
pColVal = tRowIterNext(&rIter);
|
||||||
|
|
|
@ -37,6 +37,12 @@ struct SVnodeGlobal vnodeGlobal;
|
||||||
|
|
||||||
static void* loop(void* arg);
|
static void* loop(void* arg);
|
||||||
|
|
||||||
|
static tsem_t canCommit = {0};
|
||||||
|
|
||||||
|
static void vnodeInitCommit() { tsem_init(&canCommit, 0, 4); };
|
||||||
|
void vnode_wait_commit() { tsem_wait(&canCommit); }
|
||||||
|
void vnode_done_commit() { tsem_wait(&canCommit); }
|
||||||
|
|
||||||
int vnodeInit(int nthreads) {
|
int vnodeInit(int nthreads) {
|
||||||
int8_t init;
|
int8_t init;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
|
@ -38,7 +38,7 @@ endi
|
||||||
|
|
||||||
print ============= create database
|
print ============= create database
|
||||||
#database_option: {
|
#database_option: {
|
||||||
# | BUFFER value [3~16384, default: 96]
|
# | BUFFER value [3~16384, default: 256]
|
||||||
# | PAGES value [64~16384, default: 256]
|
# | PAGES value [64~16384, default: 256]
|
||||||
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both']
|
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both']
|
||||||
# | WAL_FSYNC_PERIOD value [0 ~ 180000 ms]
|
# | WAL_FSYNC_PERIOD value [0 ~ 180000 ms]
|
||||||
|
@ -78,7 +78,7 @@ endi
|
||||||
if $data7_db != 1440000m,1440000m,1440000m then # keep
|
if $data7_db != 1440000m,1440000m,1440000m then # keep
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data8_db != 96 then # buffer
|
if $data8_db != 256 then # buffer
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data9_db != 4 then # pagesize
|
if $data9_db != 4 then # pagesize
|
||||||
|
|
|
@ -37,7 +37,7 @@ endi
|
||||||
|
|
||||||
print ============= create database with all options
|
print ============= create database with all options
|
||||||
#database_option: {
|
#database_option: {
|
||||||
# | BUFFER value [3~16384, default: 96]
|
# | BUFFER value [3~16384, default: 256]
|
||||||
# | PAGES value [64~16384, default: 256]
|
# | PAGES value [64~16384, default: 256]
|
||||||
# | PAGESIZE value [1~16384, default: 4]
|
# | PAGESIZE value [1~16384, default: 4]
|
||||||
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both', default: 'node']
|
# | CACHEMODEL value ['node', 'last_row', 'last_value', 'both', default: 'node']
|
||||||
|
@ -98,7 +98,7 @@ endi
|
||||||
if $data7_db != 5256000m,5256000m,5256000m then # keep
|
if $data7_db != 5256000m,5256000m,5256000m then # keep
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data8_db != 96 then # buffer
|
if $data8_db != 256 then # buffer
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data9_db != 4 then # pagesize
|
if $data9_db != 4 then # pagesize
|
||||||
|
|
|
@ -10,37 +10,43 @@ from util.sql import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.dnodes import *
|
from util.dnodes import *
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor(),logSql)
|
tdSql.init(conn.cursor(), logSql)
|
||||||
self.buffer_boundary = [3,4097,8193,12289,16384]
|
self.buffer_boundary = [3, 4097, 8193, 12289, 16384]
|
||||||
self.buffer_error = [self.buffer_boundary[0]-1,self.buffer_boundary[-1]+1,12289,96]
|
self.buffer_error = [self.buffer_boundary[0] -
|
||||||
|
1, self.buffer_boundary[-1]+1, 12289, 256]
|
||||||
# pages_boundary >= 64
|
# pages_boundary >= 64
|
||||||
self.pages_boundary = [64,128,512]
|
self.pages_boundary = [64, 128, 512]
|
||||||
self.pages_error = [self.pages_boundary[0]-1]
|
self.pages_error = [self.pages_boundary[0]-1]
|
||||||
|
|
||||||
def alter_buffer(self):
|
def alter_buffer(self):
|
||||||
tdSql.execute('create database db')
|
tdSql.execute('create database db')
|
||||||
for buffer in self.buffer_boundary:
|
for buffer in self.buffer_boundary:
|
||||||
tdSql.execute(f'alter database db buffer {buffer}')
|
tdSql.execute(f'alter database db buffer {buffer}')
|
||||||
tdSql.query('select * from information_schema.ins_databases where name = "db"')
|
tdSql.query(
|
||||||
tdSql.checkEqual(tdSql.queryResult[0][8],buffer)
|
'select * from information_schema.ins_databases where name = "db"')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][8], buffer)
|
||||||
tdSql.execute('drop database db')
|
tdSql.execute('drop database db')
|
||||||
tdSql.execute('create database db vgroups 10')
|
tdSql.execute('create database db vgroups 10')
|
||||||
for buffer in self.buffer_error:
|
for buffer in self.buffer_error:
|
||||||
tdSql.error(f'alter database db buffer {buffer}')
|
tdSql.error(f'alter database db buffer {buffer}')
|
||||||
tdSql.execute('drop database db')
|
tdSql.execute('drop database db')
|
||||||
|
|
||||||
def alter_pages(self):
|
def alter_pages(self):
|
||||||
tdSql.execute('create database db')
|
tdSql.execute('create database db')
|
||||||
for pages in self.pages_boundary:
|
for pages in self.pages_boundary:
|
||||||
tdSql.execute(f'alter database db pages {pages}')
|
tdSql.execute(f'alter database db pages {pages}')
|
||||||
tdSql.query('select * from information_schema.ins_databases where name = "db"')
|
tdSql.query(
|
||||||
tdSql.checkEqual(tdSql.queryResult[0][10],pages)
|
'select * from information_schema.ins_databases where name = "db"')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][10], pages)
|
||||||
tdSql.execute('drop database db')
|
tdSql.execute('drop database db')
|
||||||
tdSql.execute('create database db')
|
tdSql.execute('create database db')
|
||||||
tdSql.query('select * from information_schema.ins_databases where name = "db"')
|
tdSql.query(
|
||||||
|
'select * from information_schema.ins_databases where name = "db"')
|
||||||
self.pages_error.append(tdSql.queryResult[0][10])
|
self.pages_error.append(tdSql.queryResult[0][10])
|
||||||
for pages in self.pages_error:
|
for pages in self.pages_error:
|
||||||
tdSql.error(f'alter database db pages {pages}')
|
tdSql.error(f'alter database db pages {pages}')
|
||||||
|
@ -55,5 +61,6 @@ class TDTestCase:
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success(f"{__file__} successfully executed")
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
|
Loading…
Reference in New Issue