enh: let create database SST_TRIGGER 2 worker
This commit is contained in:
parent
ae54c4a50b
commit
acab522494
|
@ -1192,6 +1192,7 @@ typedef struct {
|
||||||
int64_t walRetentionSize;
|
int64_t walRetentionSize;
|
||||||
int32_t walRollPeriod;
|
int32_t walRollPeriod;
|
||||||
int64_t walSegmentSize;
|
int64_t walSegmentSize;
|
||||||
|
int16_t sstTrigger;
|
||||||
} SCreateVnodeReq;
|
} SCreateVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
|
|
|
@ -102,6 +102,7 @@ static const SSysDbTableSchema userDBSchema[] = {
|
||||||
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true},
|
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true},
|
||||||
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true},
|
{.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true},
|
||||||
|
{.name = "sst_trigger", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userFuncSchema[] = {
|
static const SSysDbTableSchema userFuncSchema[] = {
|
||||||
|
|
|
@ -3766,6 +3766,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
||||||
if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
|
||||||
|
if (tEncodeI16(&encoder, pReq->sstTrigger) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -3838,6 +3839,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
||||||
if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
|
||||||
|
if (tDecodeI16(&decoder, &pReq->sstTrigger) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -167,6 +167,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->walCfg.segSize = pCreate->walSegmentSize;
|
pCfg->walCfg.segSize = pCreate->walSegmentSize;
|
||||||
pCfg->walCfg.level = pCreate->walLevel;
|
pCfg->walCfg.level = pCreate->walLevel;
|
||||||
|
|
||||||
|
pCfg->sstTrigger = pCreate->sstTrigger;
|
||||||
pCfg->hashBegin = pCreate->hashBegin;
|
pCfg->hashBegin = pCreate->hashBegin;
|
||||||
pCfg->hashEnd = pCreate->hashEnd;
|
pCfg->hashEnd = pCreate->hashEnd;
|
||||||
pCfg->hashMethod = pCreate->hashMethod;
|
pCfg->hashMethod = pCreate->hashMethod;
|
||||||
|
@ -219,8 +220,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d", createReq.vgId,
|
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d",
|
||||||
createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize);
|
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
|
||||||
|
createReq.sstTrigger);
|
||||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
||||||
|
|
||||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
||||||
|
|
|
@ -311,6 +311,7 @@ typedef struct {
|
||||||
int64_t walRetentionSize;
|
int64_t walRetentionSize;
|
||||||
int32_t walRollPeriod;
|
int32_t walRollPeriod;
|
||||||
int64_t walSegmentSize;
|
int64_t walSegmentSize;
|
||||||
|
int16_t sstTrigger;
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
|
|
||||||
#define DB_VER_NUMBER 1
|
#define DB_VER_NUMBER 1
|
||||||
#define DB_RESERVE_SIZE 64
|
#define DB_RESERVE_SIZE 62
|
||||||
|
|
||||||
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
||||||
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||||
|
@ -124,6 +124,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
||||||
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
|
||||||
|
SDB_SET_INT16(pRaw, dataPos, pDb->cfg.sstTrigger, _OVER)
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||||
|
@ -207,6 +208,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.sstTrigger, _OVER)
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||||
taosInitRWLatch(&pDb->lock);
|
taosInitRWLatch(&pDb->lock);
|
||||||
|
@ -254,6 +256,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
||||||
pOld->cfg.strict = pNew->cfg.strict;
|
pOld->cfg.strict = pNew->cfg.strict;
|
||||||
pOld->cfg.cacheLast = pNew->cfg.cacheLast;
|
pOld->cfg.cacheLast = pNew->cfg.cacheLast;
|
||||||
pOld->cfg.replications = pNew->cfg.replications;
|
pOld->cfg.replications = pNew->cfg.replications;
|
||||||
|
pOld->cfg.sstTrigger = pNew->cfg.sstTrigger;
|
||||||
taosWUnLockLatch(&pOld->lock);
|
taosWUnLockLatch(&pOld->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -330,6 +333,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
||||||
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
|
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
|
||||||
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
|
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
|
||||||
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
|
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
|
||||||
|
if (pCfg->sstTrigger < TSDB_MIN_SST_TRIGGER || pCfg->sstTrigger > TSDB_MAX_SST_TRIGGER) return -1;
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -363,6 +367,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
pCfg->walRetentionSize = TSDB_REPS_DEF_DB_WAL_RET_SIZE;
|
pCfg->walRetentionSize = TSDB_REPS_DEF_DB_WAL_RET_SIZE;
|
||||||
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_REPS_DEF_DB_WAL_ROLL_PERIOD;
|
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_REPS_DEF_DB_WAL_ROLL_PERIOD;
|
||||||
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
|
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
|
||||||
|
if (pCfg->sstTrigger <= 0) pCfg->sstTrigger = TSDB_DEFAULT_SST_TRIGGER;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
|
@ -479,6 +484,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
||||||
.walRetentionSize = pCreate->walRetentionSize,
|
.walRetentionSize = pCreate->walRetentionSize,
|
||||||
.walRollPeriod = pCreate->walRollPeriod,
|
.walRollPeriod = pCreate->walRollPeriod,
|
||||||
.walSegmentSize = pCreate->walSegmentSize,
|
.walSegmentSize = pCreate->walSegmentSize,
|
||||||
|
.sstTrigger = pCreate->sstTrigger,
|
||||||
};
|
};
|
||||||
|
|
||||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||||
|
@ -1682,6 +1688,9 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walSegmentSize, false);
|
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walSegmentSize, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.sstTrigger, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
|
|
@ -234,6 +234,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
|
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
|
||||||
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
|
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
|
||||||
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
|
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
|
||||||
|
createReq.sstTrigger = pDb->cfg.sstTrigger;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
SReplica *pReplica = &createReq.replicas[v];
|
SReplica *pReplica = &createReq.replicas[v];
|
||||||
|
|
|
@ -287,6 +287,7 @@ struct SVnodeCfg {
|
||||||
SVnodeStats vndStats;
|
SVnodeStats vndStats;
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
uint32_t hashEnd;
|
uint32_t hashEnd;
|
||||||
|
int16_t sstTrigger;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -106,6 +106,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sstTrigger) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
|
@ -205,6 +206,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
|
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sstTrigger, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
|
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
|
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
|
||||||
|
|
Loading…
Reference in New Issue