Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot

This commit is contained in:
Hongze Cheng 2022-07-11 09:48:43 +00:00
commit 61fb45af56
36 changed files with 305 additions and 217 deletions

View File

@ -32,7 +32,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
- cacheLast: [Description](/reference/config/#cachelast) - cacheLast: [Description](/reference/config/#cachelast)
- replica: [Description](/reference/config/#replica) - replica: [Description](/reference/config/#replica)
- quorum: [Description](/reference/config/#quorum) - quorum: [Description](/reference/config/#quorum)
- maxVgroupsPerDb: [Description](/reference/config/#maxvgroupsperdb)
- comp: [Description](/reference/config/#comp) - comp: [Description](/reference/config/#comp)
- precision: [Description](/reference/config/#precision) - precision: [Description](/reference/config/#precision)
6. Please note that all of the parameters mentioned in this section are configured in configuration file `taos.cfg` on the TDengine server. If not specified in the `create database` statement, the values from taos.cfg are used by default. To override default parameters, they must be specified in the `create database` statement. 6. Please note that all of the parameters mentioned in this section are configured in configuration file `taos.cfg` on the TDengine server. If not specified in the `create database` statement, the values from taos.cfg are used by default. To override default parameters, they must be specified in the `create database` statement.

View File

@ -32,7 +32,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
- cacheLast: [详细说明](/reference/config/#cachelast) - cacheLast: [详细说明](/reference/config/#cachelast)
- replica: [详细说明](/reference/config/#replica) - replica: [详细说明](/reference/config/#replica)
- quorum: [详细说明](/reference/config/#quorum) - quorum: [详细说明](/reference/config/#quorum)
- maxVgroupsPerDb: [详细说明](/reference/config/#maxvgroupsperdb)
- comp: [详细说明](/reference/config/#comp) - comp: [详细说明](/reference/config/#comp)
- precision: [详细说明](/reference/config/#precision) - precision: [详细说明](/reference/config/#precision)
6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。 6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。

View File

@ -67,7 +67,6 @@ extern int32_t tsNumOfVnodeQueryThreads;
extern int32_t tsNumOfVnodeFetchThreads; extern int32_t tsNumOfVnodeFetchThreads;
extern int32_t tsNumOfVnodeWriteThreads; extern int32_t tsNumOfVnodeWriteThreads;
extern int32_t tsNumOfVnodeSyncThreads; extern int32_t tsNumOfVnodeSyncThreads;
extern int32_t tsNumOfVnodeMergeThreads;
extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeQueryThreads;
extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfQnodeFetchThreads;
extern int32_t tsNumOfSnodeSharedThreads; extern int32_t tsNumOfSnodeSharedThreads;

View File

@ -723,7 +723,7 @@ typedef struct {
int32_t buffer; // MB int32_t buffer; // MB
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t lastRowMem; int32_t cacheLastSize;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
@ -736,7 +736,7 @@ typedef struct {
int8_t compression; int8_t compression;
int8_t replications; int8_t replications;
int8_t strict; int8_t strict;
int8_t cacheLastRow; int8_t cacheLast;
int8_t schemaless; int8_t schemaless;
int8_t ignoreExist; int8_t ignoreExist;
int32_t numOfRetensions; int32_t numOfRetensions;
@ -752,7 +752,7 @@ typedef struct {
int32_t buffer; int32_t buffer;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t lastRowMem; int32_t cacheLastSize;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
@ -760,7 +760,7 @@ typedef struct {
int32_t fsyncPeriod; int32_t fsyncPeriod;
int8_t walLevel; int8_t walLevel;
int8_t strict; int8_t strict;
int8_t cacheLastRow; int8_t cacheLast;
int8_t replications; int8_t replications;
} SAlterDbReq; } SAlterDbReq;
@ -854,7 +854,7 @@ typedef struct {
int8_t compression; int8_t compression;
int8_t replications; int8_t replications;
int8_t strict; int8_t strict;
int8_t cacheLastRow; int8_t cacheLast;
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; SArray* pRetensions;
int8_t schemaless; int8_t schemaless;
@ -1119,7 +1119,7 @@ typedef struct {
int32_t buffer; int32_t buffer;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t lastRowMem; int32_t cacheLastSize;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
@ -1134,7 +1134,7 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t strict; int8_t strict;
int8_t cacheLastRow; int8_t cacheLast;
int8_t isTsma; int8_t isTsma;
int8_t standby; int8_t standby;
int8_t replica; int8_t replica;
@ -1172,7 +1172,7 @@ typedef struct {
int32_t buffer; int32_t buffer;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t lastRowMem; int32_t cacheLastSize;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
@ -1180,7 +1180,7 @@ typedef struct {
int32_t fsyncPeriod; int32_t fsyncPeriod;
int8_t walLevel; int8_t walLevel;
int8_t strict; int8_t strict;
int8_t cacheLastRow; int8_t cacheLast;
int8_t selfIndex; int8_t selfIndex;
int8_t replica; int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];

View File

@ -45,7 +45,6 @@ extern "C" {
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2, TAOS_WAL_FSYNC = 2,
} EWalType; } EWalType;
@ -74,7 +73,7 @@ typedef struct {
int8_t isWeek; int8_t isWeek;
uint64_t seqNum; uint64_t seqNum;
uint64_t term; uint64_t term;
} SSyncLogMeta; } SWalSyncInfo;
typedef struct { typedef struct {
int8_t protoVer; int8_t protoVer;
@ -84,7 +83,7 @@ typedef struct {
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
// sync meta // sync meta
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
char body[]; char body[];
} SWalCont; } SWalCont;
@ -149,11 +148,22 @@ SWal *walOpen(const char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write interfaces
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
int32_t bodyLen); // By assigning index by the caller, wal gurantees linearizability
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen);
// This interface assign version automatically and return to caller.
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// -1 will be returned for failed writes
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver); int32_t walCommit(SWal *, int64_t ver);

View File

@ -331,12 +331,12 @@ typedef enum ELogicConditionType {
#define TSDB_DB_STRICT_OFF 0 #define TSDB_DB_STRICT_OFF 0
#define TSDB_DB_STRICT_ON 1 #define TSDB_DB_STRICT_ON 1
#define TSDB_DEFAULT_DB_STRICT 0 #define TSDB_DEFAULT_DB_STRICT 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0 #define TSDB_MIN_DB_CACHE_LAST 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3 #define TSDB_MAX_DB_CACHE_LAST 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0 #define TSDB_DEFAULT_CACHE_LAST 0
#define TSDB_MIN_DB_LAST_ROW_MEM 1 // MB #define TSDB_MIN_DB_CACHE_LAST_SIZE 1 // MB
#define TSDB_MAX_DB_LAST_ROW_MEM 65536 #define TSDB_MAX_DB_CACHE_LAST_SIZE 65536
#define TSDB_DEFAULT_LAST_ROW_MEM 1 #define TSDB_DEFAULT_CACHE_LAST_SIZE 1
#define TSDB_DB_STREAM_MODE_OFF 0 #define TSDB_DB_STREAM_MODE_OFF 0
#define TSDB_DB_STREAM_MODE_ON 1 #define TSDB_DB_STREAM_MODE_ON 1
#define TSDB_DEFAULT_DB_STREAM_MODE 0 #define TSDB_DEFAULT_DB_STREAM_MODE 0

View File

@ -58,9 +58,8 @@ int32_t tsNumOfVnodeQueryThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeMergeThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 2; int32_t tsNumOfQnodeQueryThreads = 2;
int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfQnodeFetchThreads = 4;
int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2;
int32_t tsNumOfSnodeUniqueThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2;
@ -106,11 +105,6 @@ int32_t tsCompressMsgSize = -1;
*/ */
int32_t tsCompressColData = -1; int32_t tsCompressColData = -1;
/*
* denote if 3.0 query pattern compatible for 2.0
*/
int32_t tsCompatibleModel = 1;
// count/hyperloglog function always return values in case of all NULL data or Empty data set. // count/hyperloglog function always return values in case of all NULL data or Empty data set.
int32_t tsCountAlwaysReturnValue = 1; int32_t tsCountAlwaysReturnValue = 1;
@ -414,7 +408,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4); tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores / 2; tsNumOfVnodeQueryThreads = tsNumOfCores / 4;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
@ -425,19 +419,16 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores / 2; tsNumOfVnodeSyncThreads = tsNumOfCores;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeMergeThreads = tsNumOfCores / 8;
tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1);
if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeQueryThreads = tsNumOfCores / 2; tsNumOfQnodeQueryThreads = tsNumOfCores / 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 1, 1); tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
@ -598,7 +589,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
@ -840,8 +830,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) { } else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
} else if (strcasecmp("numOfVnodeMergeThreads", name) == 0) {
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {

View File

@ -2001,7 +2001,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1; if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1; if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -2014,7 +2014,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1; if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1; if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1; if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
@ -2043,7 +2043,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1; if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -2056,7 +2056,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1; if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1; if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
@ -2098,7 +2098,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1; if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1; if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -2106,7 +2106,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1; if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
@ -2124,7 +2124,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1; if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -2132,7 +2132,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
@ -2744,7 +2744,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1; if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1; if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1; if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pRsp->cacheLast) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1; if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) { for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i); SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
@ -2783,7 +2783,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->cacheLast) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
if (pRsp->numOfRetensions > 0) { if (pRsp->numOfRetensions > 0) {
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention)); pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
@ -3694,7 +3694,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1; if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1; if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -3709,7 +3709,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1; if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
@ -3752,7 +3752,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1; if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -3767,7 +3767,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1; if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
@ -3877,7 +3877,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1; if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1; if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1; if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
@ -3885,7 +3885,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
@ -3908,7 +3908,7 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1; if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1; if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1; if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
@ -3916,7 +3916,7 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {

View File

@ -210,7 +210,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby); dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d", createReq.vgId,
createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize);
vmGenerateVnodeCfg(&createReq, &vnodeCfg); vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) { if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {

View File

@ -45,7 +45,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
createReq.compression = 2; createReq.compression = 2;
createReq.replica = 1; createReq.replica = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.selfIndex = 0; createReq.selfIndex = 0;
for (int r = 0; r < createReq.replica; ++r) { for (int r = 0; r < createReq.replica; ++r) {
SReplica* pReplica = &createReq.replicas[r]; SReplica* pReplica = &createReq.replicas[r];
@ -80,7 +80,7 @@ TEST_F(DndTestVnode, 02_Alter_Vnode) {
alterReq.walLevel = 1; alterReq.walLevel = 1;
alterReq.replica = 1; alterReq.replica = 1;
alterReq.strict = 1; alterReq.strict = 1;
alterReq.cacheLastRow = 0; alterReq.cacheLast = 0;
alterReq.selfIndex = 0; alterReq.selfIndex = 0;
for (int r = 0; r < alterReq.replica; ++r) { for (int r = 0; r < alterReq.replica; ++r) {
SReplica* pReplica = &alterReq.replicas[r]; SReplica* pReplica = &alterReq.replicas[r];

View File

@ -246,7 +246,7 @@ typedef struct {
int32_t buffer; int32_t buffer;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
int32_t lastRowMem; int32_t cacheLastSize;
int32_t daysPerFile; int32_t daysPerFile;
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
@ -260,7 +260,7 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t strict; int8_t strict;
int8_t hashMethod; // default is 1 int8_t hashMethod; // default is 1
int8_t cacheLastRow; int8_t cacheLast;
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; SArray* pRetensions;
int8_t schemaless; int8_t schemaless;

View File

@ -95,7 +95,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.buffer, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.buffer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pageSize, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pageSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pages, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pages, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.lastRowMem, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheLastSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, _OVER)
@ -108,7 +108,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression, _OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications, _OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.strict, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.strict, _OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLast, _OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.hashMethod, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.hashMethod, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, _OVER)
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
@ -168,7 +168,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.buffer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.buffer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pageSize, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pageSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pages, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pages, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.lastRowMem, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.cacheLastSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, _OVER)
@ -181,7 +181,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.compression, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.compression, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.replications, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.replications, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.strict, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.strict, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLast, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.hashMethod, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.hashMethod, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, _OVER)
if (pDb->cfg.numOfRetensions > 0) { if (pDb->cfg.numOfRetensions > 0) {
@ -236,7 +236,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfg.buffer = pNew->cfg.buffer; pOld->cfg.buffer = pNew->cfg.buffer;
pOld->cfg.pageSize = pNew->cfg.pageSize; pOld->cfg.pageSize = pNew->cfg.pageSize;
pOld->cfg.pages = pNew->cfg.pages; pOld->cfg.pages = pNew->cfg.pages;
pOld->cfg.lastRowMem = pNew->cfg.lastRowMem; pOld->cfg.cacheLastSize = pNew->cfg.cacheLastSize;
pOld->cfg.daysPerFile = pNew->cfg.daysPerFile; pOld->cfg.daysPerFile = pNew->cfg.daysPerFile;
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0; pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1; pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
@ -244,7 +244,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod; pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod;
pOld->cfg.walLevel = pNew->cfg.walLevel; pOld->cfg.walLevel = pNew->cfg.walLevel;
pOld->cfg.strict = pNew->cfg.strict; pOld->cfg.strict = pNew->cfg.strict;
pOld->cfg.cacheLastRow = pNew->cfg.cacheLastRow; pOld->cfg.cacheLast = pNew->cfg.cacheLast;
pOld->cfg.replications = pNew->cfg.replications; pOld->cfg.replications = pNew->cfg.replications;
taosWUnLockLatch(&pOld->lock); taosWUnLockLatch(&pOld->lock);
return 0; return 0;
@ -293,7 +293,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->buffer < TSDB_MIN_BUFFER_PER_VNODE || pCfg->buffer > TSDB_MAX_BUFFER_PER_VNODE) return -1; if (pCfg->buffer < TSDB_MIN_BUFFER_PER_VNODE || pCfg->buffer > TSDB_MAX_BUFFER_PER_VNODE) return -1;
if (pCfg->pageSize < TSDB_MIN_PAGESIZE_PER_VNODE || pCfg->pageSize > TSDB_MAX_PAGESIZE_PER_VNODE) return -1; if (pCfg->pageSize < TSDB_MIN_PAGESIZE_PER_VNODE || pCfg->pageSize > TSDB_MAX_PAGESIZE_PER_VNODE) return -1;
if (pCfg->pages < TSDB_MIN_PAGES_PER_VNODE || pCfg->pages > TSDB_MAX_PAGES_PER_VNODE) return -1; if (pCfg->pages < TSDB_MIN_PAGES_PER_VNODE || pCfg->pages > TSDB_MAX_PAGES_PER_VNODE) return -1;
if (pCfg->lastRowMem < TSDB_MIN_DB_LAST_ROW_MEM || pCfg->lastRowMem > TSDB_MAX_DB_LAST_ROW_MEM) return -1; if (pCfg->cacheLastSize < TSDB_MIN_DB_CACHE_LAST_SIZE || pCfg->cacheLastSize > TSDB_MAX_DB_CACHE_LAST_SIZE) return -1;
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1; if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1; if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1;
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1; if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1;
@ -312,7 +312,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->replications != 1 && pCfg->replications != 3) return -1; if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1; if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
if (pCfg->schemaless < TSDB_DB_SCHEMALESS_OFF || pCfg->schemaless > TSDB_DB_SCHEMALESS_ON) return -1; if (pCfg->schemaless < TSDB_DB_SCHEMALESS_OFF || pCfg->schemaless > TSDB_DB_SCHEMALESS_ON) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->cacheLast < TSDB_MIN_DB_CACHE_LAST || pCfg->cacheLast > TSDB_MAX_DB_CACHE_LAST) return -1;
if (pCfg->hashMethod != 1) return -1; if (pCfg->hashMethod != 1) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) { if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
@ -341,8 +341,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL; if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
if (pCfg->replications < 0) pCfg->replications = TSDB_DEFAULT_DB_REPLICA; if (pCfg->replications < 0) pCfg->replications = TSDB_DEFAULT_DB_REPLICA;
if (pCfg->strict < 0) pCfg->strict = TSDB_DEFAULT_DB_STRICT; if (pCfg->strict < 0) pCfg->strict = TSDB_DEFAULT_DB_STRICT;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->cacheLast < 0) pCfg->cacheLast = TSDB_DEFAULT_CACHE_LAST;
if (pCfg->lastRowMem <= 0) pCfg->lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM; if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
} }
@ -441,7 +441,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.buffer = pCreate->buffer, .buffer = pCreate->buffer,
.pageSize = pCreate->pageSize, .pageSize = pCreate->pageSize,
.pages = pCreate->pages, .pages = pCreate->pages,
.lastRowMem = pCreate->lastRowMem, .cacheLastSize = pCreate->cacheLastSize,
.daysPerFile = pCreate->daysPerFile, .daysPerFile = pCreate->daysPerFile,
.daysToKeep0 = pCreate->daysToKeep0, .daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep1 = pCreate->daysToKeep1, .daysToKeep1 = pCreate->daysToKeep1,
@ -454,7 +454,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.compression = pCreate->compression, .compression = pCreate->compression,
.replications = pCreate->replications, .replications = pCreate->replications,
.strict = pCreate->strict, .strict = pCreate->strict,
.cacheLastRow = pCreate->cacheLastRow, .cacheLast = pCreate->cacheLast,
.hashMethod = 1, .hashMethod = 1,
.schemaless = pCreate->schemaless, .schemaless = pCreate->schemaless,
}; };
@ -625,13 +625,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
#endif #endif
} }
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) { if (pAlter->cacheLast >= 0 && pAlter->cacheLast != pDb->cfg.cacheLast) {
pDb->cfg.cacheLastRow = pAlter->cacheLastRow; pDb->cfg.cacheLast = pAlter->cacheLast;
terrno = 0; terrno = 0;
} }
if (pAlter->lastRowMem > 0 && pAlter->lastRowMem != pDb->cfg.lastRowMem) { if (pAlter->cacheLastSize > 0 && pAlter->cacheLastSize != pDb->cfg.cacheLastSize) {
pDb->cfg.lastRowMem = pAlter->lastRowMem; pDb->cfg.cacheLastSize = pAlter->cacheLastSize;
terrno = 0; terrno = 0;
} }
@ -803,7 +803,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp.compression = pDb->cfg.compression; cfgRsp.compression = pDb->cfg.compression;
cfgRsp.replications = pDb->cfg.replications; cfgRsp.replications = pDb->cfg.replications;
cfgRsp.strict = pDb->cfg.strict; cfgRsp.strict = pDb->cfg.strict;
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow; cfgRsp.cacheLast = pDb->cfg.cacheLast;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions; cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions; cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless; cfgRsp.schemaless = pDb->cfg.schemaless;
@ -1540,7 +1540,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLast, false);
const char *precStr = NULL; const char *precStr = NULL;
switch (pDb->cfg.precision) { switch (pDb->cfg.precision) {

View File

@ -207,7 +207,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.buffer = pDb->cfg.buffer; createReq.buffer = pDb->cfg.buffer;
createReq.pageSize = pDb->cfg.pageSize; createReq.pageSize = pDb->cfg.pageSize;
createReq.pages = pDb->cfg.pages; createReq.pages = pDb->cfg.pages;
createReq.lastRowMem = pDb->cfg.lastRowMem; createReq.cacheLastSize = pDb->cfg.cacheLastSize;
createReq.daysPerFile = pDb->cfg.daysPerFile; createReq.daysPerFile = pDb->cfg.daysPerFile;
createReq.daysToKeep0 = pDb->cfg.daysToKeep0; createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
createReq.daysToKeep1 = pDb->cfg.daysToKeep1; createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
@ -219,7 +219,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.precision = pDb->cfg.precision; createReq.precision = pDb->cfg.precision;
createReq.compression = pDb->cfg.compression; createReq.compression = pDb->cfg.compression;
createReq.strict = pDb->cfg.strict; createReq.strict = pDb->cfg.strict;
createReq.cacheLastRow = pDb->cfg.cacheLastRow; createReq.cacheLast = pDb->cfg.cacheLast;
createReq.replica = pVgroup->replica; createReq.replica = pVgroup->replica;
createReq.selfIndex = -1; createReq.selfIndex = -1;
createReq.hashBegin = pVgroup->hashBegin; createReq.hashBegin = pVgroup->hashBegin;
@ -277,7 +277,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
alterReq.buffer = pDb->cfg.buffer; alterReq.buffer = pDb->cfg.buffer;
alterReq.pageSize = pDb->cfg.pageSize; alterReq.pageSize = pDb->cfg.pageSize;
alterReq.pages = pDb->cfg.pages; alterReq.pages = pDb->cfg.pages;
alterReq.lastRowMem = pDb->cfg.lastRowMem; alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
alterReq.daysPerFile = pDb->cfg.daysPerFile; alterReq.daysPerFile = pDb->cfg.daysPerFile;
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0; alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1; alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
@ -285,7 +285,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod; alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
alterReq.walLevel = pDb->cfg.walLevel; alterReq.walLevel = pDb->cfg.walLevel;
alterReq.strict = pDb->cfg.strict; alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow; alterReq.cacheLast = pDb->cfg.cacheLast;
alterReq.replica = pVgroup->replica; alterReq.replica = pVgroup->replica;
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
@ -742,8 +742,8 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
int64_t vgroupMemroy = 0; int64_t vgroupMemroy = 0;
if (pDb != NULL) { if (pDb != NULL) {
vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024; vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
if (pDb->cfg.cacheLastRow > 0) { if (pDb->cfg.cacheLast > 0) {
vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024; vgroupMemroy += (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
} }
} }

View File

@ -50,7 +50,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
createReq.numOfStables = 0; createReq.numOfStables = 0;
createReq.numOfRetensions = 0; createReq.numOfRetensions = 0;
@ -84,7 +84,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
alterdbReq.fsyncPeriod = 4000; alterdbReq.fsyncPeriod = 4000;
alterdbReq.walLevel = 2; alterdbReq.walLevel = 2;
alterdbReq.strict = 1; alterdbReq.strict = 1;
alterdbReq.cacheLastRow = 1; alterdbReq.cacheLast = 1;
alterdbReq.replications = 1; alterdbReq.replications = 1;
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq); int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
@ -146,7 +146,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
createReq.numOfStables = 0; createReq.numOfStables = 0;
createReq.numOfRetensions = 0; createReq.numOfRetensions = 0;

View File

@ -288,7 +288,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
createReq.numOfStables = 0; createReq.numOfStables = 0;
createReq.numOfRetensions = 0; createReq.numOfRetensions = 0;
@ -319,7 +319,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
alterdbReq.fsyncPeriod = 4000; alterdbReq.fsyncPeriod = 4000;
alterdbReq.walLevel = 2; alterdbReq.walLevel = 2;
alterdbReq.strict = 1; alterdbReq.strict = 1;
alterdbReq.cacheLastRow = 1; alterdbReq.cacheLast = 1;
alterdbReq.replications = 3; alterdbReq.replications = 3;
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq); int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
@ -345,7 +345,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
alterdbReq.fsyncPeriod = 4000; alterdbReq.fsyncPeriod = 4000;
alterdbReq.walLevel = 2; alterdbReq.walLevel = 2;
alterdbReq.strict = 1; alterdbReq.strict = 1;
alterdbReq.cacheLastRow = 1; alterdbReq.cacheLast = 1;
alterdbReq.replications = 1; alterdbReq.replications = 1;
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq); int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);

View File

@ -55,7 +55,7 @@ void* MndTestSma::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);

View File

@ -56,7 +56,7 @@ void* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);

View File

@ -48,7 +48,7 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);

View File

@ -315,7 +315,7 @@ TEST_F(MndTestUser, 03_Alter_User) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);

View File

@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else { } else {
ASSERT(pHandle->fetchMeta); ASSERT(pHandle->fetchMeta);
ASSERT(IS_META_MSG(pHead->msgType)); ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
/*metaRsp.reqOffset = pReq->reqOffset.version;*/ /*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/ /*metaRsp.rspOffset = fetchVer;*/

View File

@ -24,7 +24,8 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
@ -219,9 +220,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break; break;
case TDMT_VND_ALTER_HASHRANGE: case TDMT_VND_ALTER_HASHRANGE:
vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp); vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
break; break;
case TDMT_VND_ALTER_CONFIG: case TDMT_VND_ALTER_CONFIG:
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
break; break;
case TDMT_VND_COMMIT: case TDMT_VND_COMMIT:
goto _do_commit; goto _do_commit;
@ -915,7 +917,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
return 0; return 0;
} }
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode)); vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
// todo // todo
@ -925,6 +927,18 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
return 0; return 0;
} }
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SAlterVnodeReq alterReq = {0};
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
alterReq.cacheLastSize);
return 0;
}
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0; int32_t code = 0;
SDecoder *pCoder = &(SDecoder){0}; SDecoder *pCoder = &(SDecoder){0};

View File

@ -109,7 +109,7 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
createReq.compression = 2; createReq.compression = 2;
createReq.replications = 1; createReq.replications = 1;
createReq.strict = 1; createReq.strict = 1;
createReq.cacheLastRow = 0; createReq.cacheLast = 0;
createReq.ignoreExist = 1; createReq.ignoreExist = 1;
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);

View File

@ -223,7 +223,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
"CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm " "CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm "
"FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d", "STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod, dbFName, pCfg->buffer, pCfg->cacheLast, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups, pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables); 1 == pCfg->numOfStables);

View File

@ -759,8 +759,8 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS); SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS);
CHECK_OUT_OF_MEM(pOptions); CHECK_OUT_OF_MEM(pOptions);
pOptions->buffer = TSDB_DEFAULT_BUFFER_PER_VNODE; pOptions->buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
pOptions->cacheLast = TSDB_DEFAULT_CACHE_LAST_ROW; pOptions->cacheLast = TSDB_DEFAULT_CACHE_LAST;
pOptions->cacheLastSize = TSDB_DEFAULT_LAST_ROW_MEM; pOptions->cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
pOptions->compressionLevel = TSDB_DEFAULT_COMP_LEVEL; pOptions->compressionLevel = TSDB_DEFAULT_COMP_LEVEL;
pOptions->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; pOptions->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
pOptions->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; pOptions->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;

View File

@ -2836,8 +2836,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->compression = pStmt->pOptions->compressionLevel; pReq->compression = pStmt->pOptions->compressionLevel;
pReq->replications = pStmt->pOptions->replica; pReq->replications = pStmt->pOptions->replica;
pReq->strict = pStmt->pOptions->strict; pReq->strict = pStmt->pOptions->strict;
pReq->cacheLastRow = pStmt->pOptions->cacheLast; pReq->cacheLast = pStmt->pOptions->cacheLast;
pReq->lastRowMem = pStmt->pOptions->cacheLastSize; pReq->cacheLastSize = pStmt->pOptions->cacheLastSize;
pReq->schemaless = pStmt->pOptions->schemaless; pReq->schemaless = pStmt->pOptions->schemaless;
pReq->ignoreExist = pStmt->ignoreExists; pReq->ignoreExist = pStmt->ignoreExists;
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
@ -2998,12 +2998,12 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
int32_t code = int32_t code =
checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE); checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST_ROW, code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST,
TSDB_MAX_DB_CACHE_LAST_ROW); TSDB_MAX_DB_CACHE_LAST);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_LAST_ROW_MEM, code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_CACHE_LAST_SIZE,
TSDB_MAX_DB_LAST_ROW_MEM); TSDB_MAX_DB_CACHE_LAST_SIZE);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "compression", pOptions->compressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); code = checkRangeOption(pCxt, "compression", pOptions->compressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
@ -3116,7 +3116,7 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
pReq->buffer = pStmt->pOptions->buffer; pReq->buffer = pStmt->pOptions->buffer;
pReq->pageSize = -1; pReq->pageSize = -1;
pReq->pages = pStmt->pOptions->pages; pReq->pages = pStmt->pOptions->pages;
pReq->lastRowMem = -1; pReq->cacheLastSize = -1;
pReq->daysPerFile = -1; pReq->daysPerFile = -1;
pReq->daysToKeep0 = pStmt->pOptions->keep[0]; pReq->daysToKeep0 = pStmt->pOptions->keep[0];
pReq->daysToKeep1 = pStmt->pOptions->keep[1]; pReq->daysToKeep1 = pStmt->pOptions->keep[1];
@ -3124,8 +3124,8 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod; pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod;
pReq->walLevel = pStmt->pOptions->walLevel; pReq->walLevel = pStmt->pOptions->walLevel;
pReq->strict = pStmt->pOptions->strict; pReq->strict = pStmt->pOptions->strict;
pReq->cacheLastRow = pStmt->pOptions->cacheLast; pReq->cacheLast = pStmt->pOptions->cacheLast;
pReq->lastRowMem = pStmt->pOptions->cacheLastSize; pReq->cacheLastSize = pStmt->pOptions->cacheLastSize;
pReq->replications = pStmt->pOptions->replica; pReq->replications = pStmt->pOptions->replica;
return; return;
} }

View File

@ -76,8 +76,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
expect.db[len] = '\0'; expect.db[len] = '\0';
expect.ignoreExist = igExists; expect.ignoreExist = igExists;
expect.buffer = TSDB_DEFAULT_BUFFER_PER_VNODE; expect.buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
expect.cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; expect.cacheLast = TSDB_DEFAULT_CACHE_LAST;
expect.lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM; expect.cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
expect.compression = TSDB_DEFAULT_COMP_LEVEL; expect.compression = TSDB_DEFAULT_COMP_LEVEL;
expect.daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; expect.daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
expect.fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; expect.fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
@ -98,8 +98,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
}; };
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; }; auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };
auto setDbCachelastFunc = [&](int8_t cachelast) { expect.cacheLastRow = cachelast; }; auto setDbCachelastFunc = [&](int8_t cachelast) { expect.cacheLast = cachelast; };
auto setDbCachelastSize = [&](int8_t cachelastSize) { expect.lastRowMem = cachelastSize; }; auto setDbCachelastSize = [&](int8_t cachelastSize) { expect.cacheLastSize = cachelastSize; };
auto setDbCompressionFunc = [&](int8_t compressionLevel) { expect.compression = compressionLevel; }; auto setDbCompressionFunc = [&](int8_t compressionLevel) { expect.compression = compressionLevel; };
auto setDbDaysFunc = [&](int32_t daysPerFile) { expect.daysPerFile = daysPerFile; }; auto setDbDaysFunc = [&](int32_t daysPerFile) { expect.daysPerFile = daysPerFile; };
auto setDbFsyncFunc = [&](int32_t fsyncPeriod) { expect.fsyncPeriod = fsyncPeriod; }; auto setDbFsyncFunc = [&](int32_t fsyncPeriod) { expect.fsyncPeriod = fsyncPeriod; };
@ -155,8 +155,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
ASSERT_EQ(req.compression, expect.compression); ASSERT_EQ(req.compression, expect.compression);
ASSERT_EQ(req.replications, expect.replications); ASSERT_EQ(req.replications, expect.replications);
ASSERT_EQ(req.strict, expect.strict); ASSERT_EQ(req.strict, expect.strict);
ASSERT_EQ(req.cacheLastRow, expect.cacheLastRow); ASSERT_EQ(req.cacheLast, expect.cacheLast);
ASSERT_EQ(req.lastRowMem, expect.lastRowMem); ASSERT_EQ(req.cacheLastSize, expect.cacheLastSize);
// ASSERT_EQ(req.schemaless, expect.schemaless); // ASSERT_EQ(req.schemaless, expect.schemaless);
ASSERT_EQ(req.ignoreExist, expect.ignoreExist); ASSERT_EQ(req.ignoreExist, expect.ignoreExist);
ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions); ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions);

View File

@ -213,7 +213,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
} }
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
@ -369,7 +369,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
ASSERT(pEntry->index == lastIndex + 1); ASSERT(pEntry->index == lastIndex + 1);
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak; syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;

View File

@ -146,12 +146,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// seek section // seek section
int walChangeWrite(SWal* pWal, int64_t ver); int walChangeWrite(SWal* pWal, int64_t ver);
int walSetWrite(SWal* pWal); int walInitWriteFile(SWal* pWal);
// seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
int walSeekWriteVer(SWal* pWal, int64_t ver); int walSeekWriteVer(SWal* pWal, int64_t ver);
int walRoll(SWal* pWal); int32_t walRollImpl(SWal* pWal);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -51,10 +51,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &fileSize, NULL);
int readSize = TMIN(WAL_MAX_SIZE + 2, file_size); int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize);
pLastFileInfo->fileSize = file_size; pLastFileInfo->fileSize = fileSize;
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
@ -145,6 +145,26 @@ int walCheckAndRepairMeta(SWal* pWal) {
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray); int actualFileNum = taosArrayGetSize(pLogInfoArray);
#if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
taosArrayPop(pLogInfoArray);
} else {
break;
}
}
actualFileNum = taosArrayGetSize(pLogInfoArray);
#endif
if (metaFileNum > actualFileNum) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
@ -164,6 +184,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
/*ASSERT(fileSize != 0);*/
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) { if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize; pLastFileInfo->fileSize = fileSize;
@ -380,9 +401,9 @@ int walLoadMeta(SWal* pWal) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr); walBuildMetaName(pWal, metaVer, fnameStr);
// read metafile // read metafile
int64_t file_size = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &fileSize, NULL);
int size = (int)file_size; int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5); char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;

View File

@ -48,7 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
return 0; return 0;
} }
int walSetWrite(SWal* pWal) { int walInitWriteFile(SWal* pWal) {
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
@ -70,6 +70,7 @@ int walSetWrite(SWal* pWal) {
// switch file // switch file
pWal->pWriteIdxTFile = pIdxTFile; pWal->pWriteIdxTFile = pIdxTFile;
pWal->pWriteLogTFile = pLogTFile; pWal->pWriteLogTFile = pLogTFile;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
return 0; return 0;
} }

View File

@ -207,12 +207,35 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0; return 0;
} }
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
/*pWal->vers.firstVer = index;*/
if (walRollImpl(pWal) < 0) {
return -1;
}
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRollImpl(pWal) < 0) {
return -1;
}
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
if (walRollImpl(pWal) < 0) {
return -1;
}
}
}
return 0;
}
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
walRoll(pWal); if (walGetLastFileSize(pWal) != 0) {
walRollImpl(pWal);
}
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
} }
@ -282,7 +305,7 @@ END:
return code; return code;
} }
int walRoll(SWal *pWal) { int32_t walRollImpl(SWal *pWal) {
int32_t code = 0; int32_t code = 0;
if (pWal->pWriteIdxTFile != NULL) { if (pWal->pWriteIdxTFile != NULL) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pWriteIdxTFile);
@ -330,11 +353,13 @@ int walRoll(SWal *pWal) {
pWal->lastRollSeq = walGetSeq(); pWal->lastRollSeq = walGetSeq();
walSaveMeta(pWal);
END: END:
return code; return code;
} }
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END); int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
@ -348,61 +373,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body, // TODO gurantee atomicity by truncate failed writing
int32_t bodyLen) { static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
int32_t code = 0; const void *body, int32_t bodyLen) {
int64_t code = 0;
// no wal
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
if (index == pWal->vers.lastVer + 1) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->vers.firstVer = index;
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else {
int64_t passed = walGetSeq() - pWal->lastRollSeq;
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
if (walRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
}
} else {
// reject skip log or rewrite log
// must truncate explicitly first
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
ASSERT(pWal->writeCur >= 0);
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteLogTFile == NULL) {
walSetWrite(pWal);
taosLSeekFile(pWal->pWriteLogTFile, 0, SEEK_END);
taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
}
pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
@ -417,7 +395,8 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
return -1; code = -1;
goto END;
} }
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
@ -425,13 +404,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
return -1; code = -1;
goto END;
} }
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code != 0) { if (code < 0) {
// TODO // TODO ftruncate
return -1; goto END;
} }
// set status // set status
@ -444,13 +424,88 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
taosThreadMutexUnlock(&pWal->mutex);
return 0; return 0;
END:
return -1;
}
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
int64_t index = pWal->vers.lastVer + 1;
if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
taosThreadMutexUnlock(&pWal->mutex);
return index;
}
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
int32_t bodyLen) {
int32_t code = 0;
if (bodyLen > TSDB_MAX_WAL_SIZE) {
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return -1;
}
taosThreadMutexLock(&pWal->mutex);
// concurrency control:
// if logs are write with assigned index,
// smaller index must be write before larger one
if (index != pWal->vers.lastVer + 1) {
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (walCheckAndRoll(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
if (walInitWriteFile(pWal) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
}
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
taosThreadMutexUnlock(&pWal->mutex);
return code;
} }
int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SSyncLogMeta syncMeta = { SWalSyncInfo syncMeta = {
.isWeek = -1, .isWeek = -1,
.seqNum = UINT64_MAX, .seqNum = UINT64_MAX,
.term = UINT64_MAX, .term = UINT64_MAX,

View File

@ -106,8 +106,8 @@ int32_t taosMkDir(const char *dirname) {
int32_t taosMulMkDir(const char *dirname) { int32_t taosMulMkDir(const char *dirname) {
if (dirname == NULL) return -1; if (dirname == NULL) return -1;
char temp[1024]; char temp[1024];
char * pos = temp; char *pos = temp;
int32_t code = 0; int32_t code = 0;
#ifdef WINDOWS #ifdef WINDOWS
taosRealPath(dirname, temp, sizeof(temp)); taosRealPath(dirname, temp, sizeof(temp));
@ -127,11 +127,11 @@ int32_t taosMulMkDir(const char *dirname) {
for (; *pos != '\0'; pos++) { for (; *pos != '\0'; pos++) {
if (*pos == TD_DIRSEP[0]) { if (*pos == TD_DIRSEP[0]) {
*pos = '\0'; *pos = '\0';
#ifdef WINDOWS #ifdef WINDOWS
code = _mkdir(temp, 0755); code = _mkdir(temp, 0755);
#else #else
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
return code; return code;
} }
@ -140,11 +140,11 @@ int32_t taosMulMkDir(const char *dirname) {
} }
if (*(pos - 1) != TD_DIRSEP[0]) { if (*(pos - 1) != TD_DIRSEP[0]) {
#ifdef WINDOWS #ifdef WINDOWS
code = _mkdir(temp, 0755); code = _mkdir(temp, 0755);
#else #else
code = mkdir(temp, 0755); code = mkdir(temp, 0755);
#endif #endif
if (code < 0 && errno != EEXIST) { if (code < 0 && errno != EEXIST) {
return code; return code;
} }
@ -267,7 +267,7 @@ char *taosDirName(char *name) {
} else { } else {
name[0] = 0; name[0] = 0;
} }
return name; return name;
#else #else
return dirname(name); return dirname(name);
#endif #endif
@ -334,9 +334,9 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) {
} }
char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) { char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) {
if (pDirEntry == NULL) { /*if (pDirEntry == NULL) {*/
return NULL; /*return NULL;*/
} /*}*/
#ifdef WINDOWS #ifdef WINDOWS
return pDirEntry->findFileData.cFileName; return pDirEntry->findFileData.cFileName;
#else #else

View File

@ -47,7 +47,7 @@ sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
print =============== step6: select data print =============== step6: select data
#sql select * from ct1 sql select * from ct1
#sql select * from stb #sql select * from stb
_OVER: _OVER:

View File

@ -37,10 +37,10 @@ endi
print =============== step3: create show table print =============== step3: create show table
sql create table ct1 using stb tags(1000) sql create table ct1 using stb tags(1000)
#sql show tables sql show tables
#if $rows != 1 then if $rows != 1 then
# return -1 return -1
#endi endi
print =============== step5: insert data print =============== step5: insert data
sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into ct1 values(now+0s, 10, 2.0, 3.0)

View File

@ -96,7 +96,7 @@ if $rows != 1 then
endi endi
sql select * from information_schema.user_tables sql select * from information_schema.user_tables
if $rows != 31 then if $rows != 30 then
return -1 return -1
endi endi

View File

@ -72,6 +72,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "buffer", pObj->cfg.buffer); tjsonAddIntegerToObject(item, "buffer", pObj->cfg.buffer);
tjsonAddIntegerToObject(item, "pageSize", pObj->cfg.pageSize); tjsonAddIntegerToObject(item, "pageSize", pObj->cfg.pageSize);
tjsonAddIntegerToObject(item, "pages", pObj->cfg.pages); tjsonAddIntegerToObject(item, "pages", pObj->cfg.pages);
tjsonAddIntegerToObject(item, "cacheLastSize", pObj->cfg.cacheLastSize);
tjsonAddIntegerToObject(item, "daysPerFile", pObj->cfg.daysPerFile); tjsonAddIntegerToObject(item, "daysPerFile", pObj->cfg.daysPerFile);
tjsonAddIntegerToObject(item, "daysToKeep0", pObj->cfg.daysToKeep0); tjsonAddIntegerToObject(item, "daysToKeep0", pObj->cfg.daysToKeep0);
tjsonAddIntegerToObject(item, "daysToKeep1", pObj->cfg.daysToKeep1); tjsonAddIntegerToObject(item, "daysToKeep1", pObj->cfg.daysToKeep1);
@ -84,7 +85,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "compression", pObj->cfg.compression); tjsonAddIntegerToObject(item, "compression", pObj->cfg.compression);
tjsonAddIntegerToObject(item, "replications", pObj->cfg.replications); tjsonAddIntegerToObject(item, "replications", pObj->cfg.replications);
tjsonAddIntegerToObject(item, "strict", pObj->cfg.strict); tjsonAddIntegerToObject(item, "strict", pObj->cfg.strict);
tjsonAddIntegerToObject(item, "cacheLastRow", pObj->cfg.cacheLastRow); tjsonAddIntegerToObject(item, "cacheLast", pObj->cfg.cacheLast);
tjsonAddIntegerToObject(item, "hashMethod", pObj->cfg.hashMethod); tjsonAddIntegerToObject(item, "hashMethod", pObj->cfg.hashMethod);
tjsonAddIntegerToObject(item, "numOfRetensions", pObj->cfg.numOfRetensions); tjsonAddIntegerToObject(item, "numOfRetensions", pObj->cfg.numOfRetensions);
tjsonAddIntegerToObject(item, "schemaless", pObj->cfg.schemaless); tjsonAddIntegerToObject(item, "schemaless", pObj->cfg.schemaless);