Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
20f3c7684d
|
@ -32,7 +32,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
|
|||
- cacheLast: [Description](/reference/config/#cachelast)
|
||||
- replica: [Description](/reference/config/#replica)
|
||||
- quorum: [Description](/reference/config/#quorum)
|
||||
- maxVgroupsPerDb: [Description](/reference/config/#maxvgroupsperdb)
|
||||
- comp: [Description](/reference/config/#comp)
|
||||
- precision: [Description](/reference/config/#precision)
|
||||
6. Please note that all of the parameters mentioned in this section are configured in configuration file `taos.cfg` on the TDengine server. If not specified in the `create database` statement, the values from taos.cfg are used by default. To override default parameters, they must be specified in the `create database` statement.
|
||||
|
|
|
@ -32,7 +32,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name [KEEP keep] [DAYS days] [UPDATE 1];
|
|||
- cacheLast: [详细说明](/reference/config/#cachelast)
|
||||
- replica: [详细说明](/reference/config/#replica)
|
||||
- quorum: [详细说明](/reference/config/#quorum)
|
||||
- maxVgroupsPerDb: [详细说明](/reference/config/#maxvgroupsperdb)
|
||||
- comp: [详细说明](/reference/config/#comp)
|
||||
- precision: [详细说明](/reference/config/#precision)
|
||||
6. 请注意上面列出的所有参数都可以配置在配置文件 `taosd.cfg` 中作为创建数据库时使用的默认配置, `create database` 的参数中明确指定的会覆盖配置文件中的设置。
|
||||
|
|
|
@ -67,7 +67,6 @@ extern int32_t tsNumOfVnodeQueryThreads;
|
|||
extern int32_t tsNumOfVnodeFetchThreads;
|
||||
extern int32_t tsNumOfVnodeWriteThreads;
|
||||
extern int32_t tsNumOfVnodeSyncThreads;
|
||||
extern int32_t tsNumOfVnodeMergeThreads;
|
||||
extern int32_t tsNumOfQnodeQueryThreads;
|
||||
extern int32_t tsNumOfQnodeFetchThreads;
|
||||
extern int32_t tsNumOfSnodeSharedThreads;
|
||||
|
|
|
@ -723,7 +723,7 @@ typedef struct {
|
|||
int32_t buffer; // MB
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t lastRowMem;
|
||||
int32_t cacheLastSize;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
|
@ -736,7 +736,7 @@ typedef struct {
|
|||
int8_t compression;
|
||||
int8_t replications;
|
||||
int8_t strict;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int8_t schemaless;
|
||||
int8_t ignoreExist;
|
||||
int32_t numOfRetensions;
|
||||
|
@ -752,7 +752,7 @@ typedef struct {
|
|||
int32_t buffer;
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t lastRowMem;
|
||||
int32_t cacheLastSize;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
|
@ -760,7 +760,7 @@ typedef struct {
|
|||
int32_t fsyncPeriod;
|
||||
int8_t walLevel;
|
||||
int8_t strict;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int8_t replications;
|
||||
} SAlterDbReq;
|
||||
|
||||
|
@ -840,7 +840,7 @@ typedef struct {
|
|||
int8_t compression;
|
||||
int8_t replications;
|
||||
int8_t strict;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions;
|
||||
int8_t schemaless;
|
||||
|
@ -1105,7 +1105,7 @@ typedef struct {
|
|||
int32_t buffer;
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t lastRowMem;
|
||||
int32_t cacheLastSize;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
|
@ -1120,7 +1120,7 @@ typedef struct {
|
|||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t strict;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int8_t isTsma;
|
||||
int8_t standby;
|
||||
int8_t replica;
|
||||
|
@ -1158,7 +1158,7 @@ typedef struct {
|
|||
int32_t buffer;
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t lastRowMem;
|
||||
int32_t cacheLastSize;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
|
@ -1166,7 +1166,7 @@ typedef struct {
|
|||
int32_t fsyncPeriod;
|
||||
int8_t walLevel;
|
||||
int8_t strict;
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int8_t selfIndex;
|
||||
int8_t replica;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
|
|
|
@ -45,7 +45,6 @@ extern "C" {
|
|||
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||
|
||||
typedef enum {
|
||||
TAOS_WAL_NOLOG = 0,
|
||||
TAOS_WAL_WRITE = 1,
|
||||
TAOS_WAL_FSYNC = 2,
|
||||
} EWalType;
|
||||
|
@ -74,7 +73,7 @@ typedef struct {
|
|||
int8_t isWeek;
|
||||
uint64_t seqNum;
|
||||
uint64_t term;
|
||||
} SSyncLogMeta;
|
||||
} SWalSyncInfo;
|
||||
|
||||
typedef struct {
|
||||
int8_t protoVer;
|
||||
|
@ -84,7 +83,7 @@ typedef struct {
|
|||
int64_t ingestTs; // not implemented
|
||||
|
||||
// sync meta
|
||||
SSyncLogMeta syncMeta;
|
||||
SWalSyncInfo syncMeta;
|
||||
|
||||
char body[];
|
||||
} SWalCont;
|
||||
|
@ -149,11 +148,22 @@ SWal *walOpen(const char *path, SWalCfg *pCfg);
|
|||
int32_t walAlter(SWal *, SWalCfg *pCfg);
|
||||
void walClose(SWal *);
|
||||
|
||||
// write
|
||||
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
|
||||
int32_t bodyLen);
|
||||
// write interfaces
|
||||
|
||||
// By assigning index by the caller, wal gurantees linearizability
|
||||
int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
|
||||
void walFsync(SWal *, bool force);
|
||||
int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
|
||||
int32_t bodyLen);
|
||||
|
||||
// This interface assign version automatically and return to caller.
|
||||
// When using this interface with concurrent writes,
|
||||
// wal will write all logs atomically,
|
||||
// but not sure which one will be actually write first,
|
||||
// and then the unique index of successful writen is returned.
|
||||
// -1 will be returned for failed writes
|
||||
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
||||
|
||||
void walFsync(SWal *, bool force);
|
||||
|
||||
// apis for lifecycle management
|
||||
int32_t walCommit(SWal *, int64_t ver);
|
||||
|
|
|
@ -331,12 +331,12 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_DB_STRICT_OFF 0
|
||||
#define TSDB_DB_STRICT_ON 1
|
||||
#define TSDB_DEFAULT_DB_STRICT 0
|
||||
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
|
||||
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
|
||||
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
||||
#define TSDB_MIN_DB_LAST_ROW_MEM 1 // MB
|
||||
#define TSDB_MAX_DB_LAST_ROW_MEM 65536
|
||||
#define TSDB_DEFAULT_LAST_ROW_MEM 1
|
||||
#define TSDB_MIN_DB_CACHE_LAST 0
|
||||
#define TSDB_MAX_DB_CACHE_LAST 3
|
||||
#define TSDB_DEFAULT_CACHE_LAST 0
|
||||
#define TSDB_MIN_DB_CACHE_LAST_SIZE 1 // MB
|
||||
#define TSDB_MAX_DB_CACHE_LAST_SIZE 65536
|
||||
#define TSDB_DEFAULT_CACHE_LAST_SIZE 1
|
||||
#define TSDB_DB_STREAM_MODE_OFF 0
|
||||
#define TSDB_DB_STREAM_MODE_ON 1
|
||||
#define TSDB_DEFAULT_DB_STREAM_MODE 0
|
||||
|
|
|
@ -810,11 +810,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
|||
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||
SRequestObj* pRequest = (SRequestObj*)param;
|
||||
pRequest->code = code;
|
||||
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
||||
|
||||
if (pResult) {
|
||||
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
||||
}
|
||||
|
||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
|
||||
TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||
if (pResult) {
|
||||
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
|
||||
}
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
}
|
||||
|
@ -1476,12 +1481,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
|||
tsem_wait(&pParam->sem);
|
||||
}
|
||||
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
}
|
||||
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
} else {
|
||||
if (setupOneRowPtr) {
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
}
|
||||
|
||||
return pResultInfo->row;
|
||||
return pResultInfo->row;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
||||
|
|
|
@ -1747,7 +1747,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
for (int32_t k = 0; k < colNum; k++) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
if (colDataIsNull(pColInfoData, rows, j, NULL) || !var) {
|
||||
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
||||
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
||||
if (len >= size -1) return dumpBuf;
|
||||
continue;
|
||||
|
|
|
@ -58,9 +58,8 @@ int32_t tsNumOfVnodeQueryThreads = 2;
|
|||
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||
int32_t tsNumOfQnodeQueryThreads = 2;
|
||||
int32_t tsNumOfQnodeFetchThreads = 1;
|
||||
int32_t tsNumOfQnodeFetchThreads = 4;
|
||||
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||
int32_t tsNumOfSnodeUniqueThreads = 2;
|
||||
|
||||
|
@ -106,11 +105,6 @@ int32_t tsCompressMsgSize = -1;
|
|||
*/
|
||||
int32_t tsCompressColData = -1;
|
||||
|
||||
/*
|
||||
* denote if 3.0 query pattern compatible for 2.0
|
||||
*/
|
||||
int32_t tsCompatibleModel = 1;
|
||||
|
||||
// count/hyperloglog function always return values in case of all NULL data or Empty data set.
|
||||
int32_t tsCountAlwaysReturnValue = 1;
|
||||
|
||||
|
@ -414,7 +408,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeQueryThreads = tsNumOfCores / 4;
|
||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
|
@ -425,19 +419,16 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeSyncThreads = tsNumOfCores / 2;
|
||||
tsNumOfVnodeSyncThreads = tsNumOfCores;
|
||||
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeMergeThreads = tsNumOfCores / 8;
|
||||
tsNumOfVnodeMergeThreads = TRANGE(tsNumOfVnodeMergeThreads, 1, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeQueryThreads = tsNumOfCores / 2;
|
||||
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfQnodeFetchThreads = TRANGE(tsNumOfQnodeFetchThreads, 1, 1);
|
||||
tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
|
||||
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
|
||||
|
@ -598,7 +589,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
|
@ -840,8 +830,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
|
||||
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||
} else if (strcasecmp("numOfVnodeMergeThreads", name) == 0) {
|
||||
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
|
||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
|
||||
|
|
|
@ -2001,7 +2001,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -2014,7 +2014,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->schemaless) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||
|
@ -2043,7 +2043,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
|||
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -2056,7 +2056,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
|||
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->schemaless) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||
|
@ -2098,7 +2098,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -2106,7 +2106,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -2124,7 +2124,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -2132,7 +2132,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -2694,7 +2694,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
|||
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->cacheLast) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
|
||||
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
|
||||
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
|
||||
|
@ -2733,7 +2733,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
|
|||
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->cacheLast) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
|
||||
if (pRsp->numOfRetensions > 0) {
|
||||
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
|
||||
|
@ -3644,7 +3644,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -3659,7 +3659,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||
|
@ -3702,7 +3702,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -3717,7 +3717,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||
|
@ -3827,7 +3827,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
|
|||
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->lastRowMem) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -3835,7 +3835,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
|
|||
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
|
@ -3858,7 +3858,7 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
|
|||
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->lastRowMem) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
|
||||
|
@ -3866,7 +3866,7 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
|
|||
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
|
|
|
@ -210,7 +210,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby);
|
||||
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d", createReq.vgId,
|
||||
createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize);
|
||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
||||
|
||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
||||
|
|
|
@ -45,7 +45,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
|
|||
createReq.compression = 2;
|
||||
createReq.replica = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.selfIndex = 0;
|
||||
for (int r = 0; r < createReq.replica; ++r) {
|
||||
SReplica* pReplica = &createReq.replicas[r];
|
||||
|
@ -80,7 +80,7 @@ TEST_F(DndTestVnode, 02_Alter_Vnode) {
|
|||
alterReq.walLevel = 1;
|
||||
alterReq.replica = 1;
|
||||
alterReq.strict = 1;
|
||||
alterReq.cacheLastRow = 0;
|
||||
alterReq.cacheLast = 0;
|
||||
alterReq.selfIndex = 0;
|
||||
for (int r = 0; r < alterReq.replica; ++r) {
|
||||
SReplica* pReplica = &alterReq.replicas[r];
|
||||
|
|
|
@ -246,7 +246,7 @@ typedef struct {
|
|||
int32_t buffer;
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t lastRowMem;
|
||||
int32_t cacheLastSize;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
|
@ -260,7 +260,7 @@ typedef struct {
|
|||
int8_t replications;
|
||||
int8_t strict;
|
||||
int8_t hashMethod; // default is 1
|
||||
int8_t cacheLastRow;
|
||||
int8_t cacheLast;
|
||||
int32_t numOfRetensions;
|
||||
SArray* pRetensions;
|
||||
int8_t schemaless;
|
||||
|
|
|
@ -93,7 +93,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.buffer, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pageSize, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.pages, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.lastRowMem, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheLastSize, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, _OVER)
|
||||
|
@ -106,7 +106,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
|||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.strict, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLast, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.hashMethod, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, _OVER)
|
||||
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
|
||||
|
@ -166,7 +166,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.buffer, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pageSize, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.pages, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.lastRowMem, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.cacheLastSize, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, _OVER)
|
||||
|
@ -179,7 +179,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.compression, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.replications, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.strict, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLast, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.hashMethod, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, _OVER)
|
||||
if (pDb->cfg.numOfRetensions > 0) {
|
||||
|
@ -234,7 +234,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
pOld->cfg.buffer = pNew->cfg.buffer;
|
||||
pOld->cfg.pageSize = pNew->cfg.pageSize;
|
||||
pOld->cfg.pages = pNew->cfg.pages;
|
||||
pOld->cfg.lastRowMem = pNew->cfg.lastRowMem;
|
||||
pOld->cfg.cacheLastSize = pNew->cfg.cacheLastSize;
|
||||
pOld->cfg.daysPerFile = pNew->cfg.daysPerFile;
|
||||
pOld->cfg.daysToKeep0 = pNew->cfg.daysToKeep0;
|
||||
pOld->cfg.daysToKeep1 = pNew->cfg.daysToKeep1;
|
||||
|
@ -242,7 +242,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
pOld->cfg.fsyncPeriod = pNew->cfg.fsyncPeriod;
|
||||
pOld->cfg.walLevel = pNew->cfg.walLevel;
|
||||
pOld->cfg.strict = pNew->cfg.strict;
|
||||
pOld->cfg.cacheLastRow = pNew->cfg.cacheLastRow;
|
||||
pOld->cfg.cacheLast = pNew->cfg.cacheLast;
|
||||
pOld->cfg.replications = pNew->cfg.replications;
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
return 0;
|
||||
|
@ -291,7 +291,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->buffer < TSDB_MIN_BUFFER_PER_VNODE || pCfg->buffer > TSDB_MAX_BUFFER_PER_VNODE) return -1;
|
||||
if (pCfg->pageSize < TSDB_MIN_PAGESIZE_PER_VNODE || pCfg->pageSize > TSDB_MAX_PAGESIZE_PER_VNODE) return -1;
|
||||
if (pCfg->pages < TSDB_MIN_PAGES_PER_VNODE || pCfg->pages > TSDB_MAX_PAGES_PER_VNODE) return -1;
|
||||
if (pCfg->lastRowMem < TSDB_MIN_DB_LAST_ROW_MEM || pCfg->lastRowMem > TSDB_MAX_DB_LAST_ROW_MEM) return -1;
|
||||
if (pCfg->cacheLastSize < TSDB_MIN_DB_CACHE_LAST_SIZE || pCfg->cacheLastSize > TSDB_MAX_DB_CACHE_LAST_SIZE) return -1;
|
||||
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
|
||||
if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1;
|
||||
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1;
|
||||
|
@ -310,7 +310,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
|
||||
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
|
||||
if (pCfg->schemaless < TSDB_DB_SCHEMALESS_OFF || pCfg->schemaless > TSDB_DB_SCHEMALESS_ON) return -1;
|
||||
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
||||
if (pCfg->cacheLast < TSDB_MIN_DB_CACHE_LAST || pCfg->cacheLast > TSDB_MAX_DB_CACHE_LAST) return -1;
|
||||
if (pCfg->hashMethod != 1) return -1;
|
||||
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
|
||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||
|
@ -339,8 +339,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
|
||||
if (pCfg->replications < 0) pCfg->replications = TSDB_DEFAULT_DB_REPLICA;
|
||||
if (pCfg->strict < 0) pCfg->strict = TSDB_DEFAULT_DB_STRICT;
|
||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||
if (pCfg->lastRowMem <= 0) pCfg->lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM;
|
||||
if (pCfg->cacheLast < 0) pCfg->cacheLast = TSDB_DEFAULT_CACHE_LAST;
|
||||
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
|
||||
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
||||
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
|
||||
}
|
||||
|
@ -439,7 +439,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
.buffer = pCreate->buffer,
|
||||
.pageSize = pCreate->pageSize,
|
||||
.pages = pCreate->pages,
|
||||
.lastRowMem = pCreate->lastRowMem,
|
||||
.cacheLastSize = pCreate->cacheLastSize,
|
||||
.daysPerFile = pCreate->daysPerFile,
|
||||
.daysToKeep0 = pCreate->daysToKeep0,
|
||||
.daysToKeep1 = pCreate->daysToKeep1,
|
||||
|
@ -452,7 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
.compression = pCreate->compression,
|
||||
.replications = pCreate->replications,
|
||||
.strict = pCreate->strict,
|
||||
.cacheLastRow = pCreate->cacheLastRow,
|
||||
.cacheLast = pCreate->cacheLast,
|
||||
.hashMethod = 1,
|
||||
.schemaless = pCreate->schemaless,
|
||||
};
|
||||
|
@ -623,13 +623,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
|||
#endif
|
||||
}
|
||||
|
||||
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) {
|
||||
pDb->cfg.cacheLastRow = pAlter->cacheLastRow;
|
||||
if (pAlter->cacheLast >= 0 && pAlter->cacheLast != pDb->cfg.cacheLast) {
|
||||
pDb->cfg.cacheLast = pAlter->cacheLast;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
if (pAlter->lastRowMem > 0 && pAlter->lastRowMem != pDb->cfg.lastRowMem) {
|
||||
pDb->cfg.lastRowMem = pAlter->lastRowMem;
|
||||
if (pAlter->cacheLastSize > 0 && pAlter->cacheLastSize != pDb->cfg.cacheLastSize) {
|
||||
pDb->cfg.cacheLastSize = pAlter->cacheLastSize;
|
||||
terrno = 0;
|
||||
}
|
||||
|
||||
|
@ -801,7 +801,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
|
|||
cfgRsp.compression = pDb->cfg.compression;
|
||||
cfgRsp.replications = pDb->cfg.replications;
|
||||
cfgRsp.strict = pDb->cfg.strict;
|
||||
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
cfgRsp.cacheLast = pDb->cfg.cacheLast;
|
||||
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||
cfgRsp.pRetensions = pDb->cfg.pRetensions;
|
||||
cfgRsp.schemaless = pDb->cfg.schemaless;
|
||||
|
@ -1467,7 +1467,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
|
|||
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false);
|
||||
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLast, false);
|
||||
|
||||
const char *precStr = NULL;
|
||||
switch (pDb->cfg.precision) {
|
||||
|
|
|
@ -207,7 +207,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.buffer = pDb->cfg.buffer;
|
||||
createReq.pageSize = pDb->cfg.pageSize;
|
||||
createReq.pages = pDb->cfg.pages;
|
||||
createReq.lastRowMem = pDb->cfg.lastRowMem;
|
||||
createReq.cacheLastSize = pDb->cfg.cacheLastSize;
|
||||
createReq.daysPerFile = pDb->cfg.daysPerFile;
|
||||
createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||
createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||
|
@ -219,7 +219,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.precision = pDb->cfg.precision;
|
||||
createReq.compression = pDb->cfg.compression;
|
||||
createReq.strict = pDb->cfg.strict;
|
||||
createReq.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
createReq.cacheLast = pDb->cfg.cacheLast;
|
||||
createReq.replica = pVgroup->replica;
|
||||
createReq.selfIndex = -1;
|
||||
createReq.hashBegin = pVgroup->hashBegin;
|
||||
|
@ -277,7 +277,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
|
|||
alterReq.buffer = pDb->cfg.buffer;
|
||||
alterReq.pageSize = pDb->cfg.pageSize;
|
||||
alterReq.pages = pDb->cfg.pages;
|
||||
alterReq.lastRowMem = pDb->cfg.lastRowMem;
|
||||
alterReq.cacheLastSize = pDb->cfg.cacheLastSize;
|
||||
alterReq.daysPerFile = pDb->cfg.daysPerFile;
|
||||
alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
|
||||
alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
|
||||
|
@ -285,7 +285,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
|
|||
alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
|
||||
alterReq.walLevel = pDb->cfg.walLevel;
|
||||
alterReq.strict = pDb->cfg.strict;
|
||||
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
alterReq.cacheLast = pDb->cfg.cacheLast;
|
||||
alterReq.replica = pVgroup->replica;
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
|
@ -742,8 +742,8 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
|
|||
int64_t vgroupMemroy = 0;
|
||||
if (pDb != NULL) {
|
||||
vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
|
||||
if (pDb->cfg.cacheLastRow > 0) {
|
||||
vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024;
|
||||
if (pDb->cfg.cacheLast > 0) {
|
||||
vgroupMemroy += (int64_t)pDb->cfg.cacheLastSize * 1024 * 1024;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
createReq.numOfStables = 0;
|
||||
createReq.numOfRetensions = 0;
|
||||
|
@ -84,7 +84,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
alterdbReq.fsyncPeriod = 4000;
|
||||
alterdbReq.walLevel = 2;
|
||||
alterdbReq.strict = 1;
|
||||
alterdbReq.cacheLastRow = 1;
|
||||
alterdbReq.cacheLast = 1;
|
||||
alterdbReq.replications = 1;
|
||||
|
||||
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
|
||||
|
@ -146,7 +146,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
createReq.numOfStables = 0;
|
||||
createReq.numOfRetensions = 0;
|
||||
|
|
|
@ -288,7 +288,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
createReq.numOfStables = 0;
|
||||
createReq.numOfRetensions = 0;
|
||||
|
@ -319,7 +319,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
|
|||
alterdbReq.fsyncPeriod = 4000;
|
||||
alterdbReq.walLevel = 2;
|
||||
alterdbReq.strict = 1;
|
||||
alterdbReq.cacheLastRow = 1;
|
||||
alterdbReq.cacheLast = 1;
|
||||
alterdbReq.replications = 3;
|
||||
|
||||
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
|
||||
|
@ -345,7 +345,7 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
|
|||
alterdbReq.fsyncPeriod = 4000;
|
||||
alterdbReq.walLevel = 2;
|
||||
alterdbReq.strict = 1;
|
||||
alterdbReq.cacheLastRow = 1;
|
||||
alterdbReq.cacheLast = 1;
|
||||
alterdbReq.replications = 1;
|
||||
|
||||
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
|
||||
|
|
|
@ -55,7 +55,7 @@ void* MndTestSma::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
|
|
|
@ -56,7 +56,7 @@ void* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
|
|
|
@ -48,7 +48,7 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
|
|
|
@ -315,7 +315,7 @@ TEST_F(MndTestUser, 03_Alter_User) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
|
|
|
@ -394,7 +394,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
} else {
|
||||
ASSERT(pHandle->fetchMeta);
|
||||
ASSERT(IS_META_MSG(pHead->msgType));
|
||||
tqInfo("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
|
||||
/*metaRsp.rspOffset = fetchVer;*/
|
||||
|
|
|
@ -24,7 +24,8 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
|
@ -215,9 +216,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_ALTER_HASHRANGE:
|
||||
vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp);
|
||||
vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_ALTER_CONFIG:
|
||||
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_COMMIT:
|
||||
goto _do_commit;
|
||||
|
@ -886,7 +888,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
|
||||
|
||||
// todo
|
||||
|
@ -896,6 +898,18 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SAlterVnodeReq alterReq = {0};
|
||||
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
|
||||
alterReq.cacheLastSize);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SDecoder *pCoder = &(SDecoder){0};
|
||||
|
|
|
@ -109,7 +109,7 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
|
|||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.cacheLast = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
|
|
|
@ -223,7 +223,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
|
|||
"CREATE DATABASE `%s` BUFFER %d CACHELAST %d COMP %d DURATION %dm "
|
||||
"FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
||||
"STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
|
||||
dbFName, pCfg->buffer, pCfg->cacheLastRow, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod,
|
||||
dbFName, pCfg->buffer, pCfg->cacheLast, pCfg->compression, pCfg->daysPerFile, pCfg->fsyncPeriod,
|
||||
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
|
||||
pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
|
||||
1 == pCfg->numOfStables);
|
||||
|
|
|
@ -779,6 +779,8 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
|
|||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||
SArray* pColList);
|
||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
||||
|
||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||
|
||||
|
|
|
@ -869,7 +869,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
|
|||
return w;
|
||||
}
|
||||
|
||||
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
||||
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
||||
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
|
||||
|
||||
STimeWindow win = *pWindow;
|
||||
|
|
|
@ -545,9 +545,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
|
|||
if (pCtx[k].fpSet.process == NULL) {
|
||||
continue;
|
||||
}
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_process");
|
||||
#endif
|
||||
|
||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
|
@ -578,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
int32_t numOfRows = 0;
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
SInputColumnInfoData* pInputData = &pfCtx->input;
|
||||
|
||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
|
||||
pInputData->numOfRows);
|
||||
} else {
|
||||
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||
}
|
||||
|
@ -643,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
|
||||
// todo handle the json tag
|
||||
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
bool isNull = colDataIsNull_s(pInput, f);
|
||||
if (isNull) {
|
||||
colDataAppendNULL(pOutput, pResult->info.rows + f);
|
||||
|
@ -2443,7 +2442,6 @@ _error:
|
|||
doDestroyExchangeOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
@ -3393,6 +3391,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
assert(pBlock != NULL);
|
||||
}
|
||||
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
|
||||
if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
*newgroup = false;
|
||||
|
@ -3821,7 +3821,8 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t order = 0;
|
||||
int32_t scanFlag = 0;
|
||||
|
||||
|
@ -3876,9 +3877,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
// here we need to handle the existsed group results
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
||||
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
|
||||
|
||||
|
@ -3976,15 +3977,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
|
||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||
|
@ -4010,6 +4011,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
|
||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
|
||||
|
@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo =
|
||||
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
||||
|
@ -4057,18 +4059,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
|
|||
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
|
||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
tDecoderClear(&mr.coder);
|
||||
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUid(&mr, suid);
|
||||
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
|
|
|
@ -4502,7 +4502,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
size_t rows = pRes->info.rows;
|
||||
blockDataUpdateTsWindow(pRes, iaInfo->primaryTsIndex);
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
return (rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
|
|
@ -759,8 +759,8 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
|
|||
SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS);
|
||||
CHECK_OUT_OF_MEM(pOptions);
|
||||
pOptions->buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
|
||||
pOptions->cacheLast = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||
pOptions->cacheLastSize = TSDB_DEFAULT_LAST_ROW_MEM;
|
||||
pOptions->cacheLast = TSDB_DEFAULT_CACHE_LAST;
|
||||
pOptions->cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
|
||||
pOptions->compressionLevel = TSDB_DEFAULT_COMP_LEVEL;
|
||||
pOptions->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||
pOptions->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
|
|
|
@ -2836,8 +2836,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
|
|||
pReq->compression = pStmt->pOptions->compressionLevel;
|
||||
pReq->replications = pStmt->pOptions->replica;
|
||||
pReq->strict = pStmt->pOptions->strict;
|
||||
pReq->cacheLastRow = pStmt->pOptions->cacheLast;
|
||||
pReq->lastRowMem = pStmt->pOptions->cacheLastSize;
|
||||
pReq->cacheLast = pStmt->pOptions->cacheLast;
|
||||
pReq->cacheLastSize = pStmt->pOptions->cacheLastSize;
|
||||
pReq->schemaless = pStmt->pOptions->schemaless;
|
||||
pReq->ignoreExist = pStmt->ignoreExists;
|
||||
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
||||
|
@ -2998,12 +2998,12 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
|||
int32_t code =
|
||||
checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST_ROW,
|
||||
TSDB_MAX_DB_CACHE_LAST_ROW);
|
||||
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST,
|
||||
TSDB_MAX_DB_CACHE_LAST);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_LAST_ROW_MEM,
|
||||
TSDB_MAX_DB_LAST_ROW_MEM);
|
||||
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_CACHE_LAST_SIZE,
|
||||
TSDB_MAX_DB_CACHE_LAST_SIZE);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkRangeOption(pCxt, "compression", pOptions->compressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
|
||||
|
@ -3116,7 +3116,7 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
|
|||
pReq->buffer = pStmt->pOptions->buffer;
|
||||
pReq->pageSize = -1;
|
||||
pReq->pages = pStmt->pOptions->pages;
|
||||
pReq->lastRowMem = -1;
|
||||
pReq->cacheLastSize = -1;
|
||||
pReq->daysPerFile = -1;
|
||||
pReq->daysToKeep0 = pStmt->pOptions->keep[0];
|
||||
pReq->daysToKeep1 = pStmt->pOptions->keep[1];
|
||||
|
@ -3124,8 +3124,8 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
|
|||
pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod;
|
||||
pReq->walLevel = pStmt->pOptions->walLevel;
|
||||
pReq->strict = pStmt->pOptions->strict;
|
||||
pReq->cacheLastRow = pStmt->pOptions->cacheLast;
|
||||
pReq->lastRowMem = pStmt->pOptions->cacheLastSize;
|
||||
pReq->cacheLast = pStmt->pOptions->cacheLast;
|
||||
pReq->cacheLastSize = pStmt->pOptions->cacheLastSize;
|
||||
pReq->replications = pStmt->pOptions->replica;
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -76,8 +76,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
|
|||
expect.db[len] = '\0';
|
||||
expect.ignoreExist = igExists;
|
||||
expect.buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
|
||||
expect.cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||
expect.lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM;
|
||||
expect.cacheLast = TSDB_DEFAULT_CACHE_LAST;
|
||||
expect.cacheLastSize = TSDB_DEFAULT_CACHE_LAST_SIZE;
|
||||
expect.compression = TSDB_DEFAULT_COMP_LEVEL;
|
||||
expect.daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||
expect.fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
|
@ -98,8 +98,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
|
|||
};
|
||||
|
||||
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };
|
||||
auto setDbCachelastFunc = [&](int8_t cachelast) { expect.cacheLastRow = cachelast; };
|
||||
auto setDbCachelastSize = [&](int8_t cachelastSize) { expect.lastRowMem = cachelastSize; };
|
||||
auto setDbCachelastFunc = [&](int8_t cachelast) { expect.cacheLast = cachelast; };
|
||||
auto setDbCachelastSize = [&](int8_t cachelastSize) { expect.cacheLastSize = cachelastSize; };
|
||||
auto setDbCompressionFunc = [&](int8_t compressionLevel) { expect.compression = compressionLevel; };
|
||||
auto setDbDaysFunc = [&](int32_t daysPerFile) { expect.daysPerFile = daysPerFile; };
|
||||
auto setDbFsyncFunc = [&](int32_t fsyncPeriod) { expect.fsyncPeriod = fsyncPeriod; };
|
||||
|
@ -155,8 +155,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
|
|||
ASSERT_EQ(req.compression, expect.compression);
|
||||
ASSERT_EQ(req.replications, expect.replications);
|
||||
ASSERT_EQ(req.strict, expect.strict);
|
||||
ASSERT_EQ(req.cacheLastRow, expect.cacheLastRow);
|
||||
ASSERT_EQ(req.lastRowMem, expect.lastRowMem);
|
||||
ASSERT_EQ(req.cacheLast, expect.cacheLast);
|
||||
ASSERT_EQ(req.cacheLastSize, expect.cacheLastSize);
|
||||
// ASSERT_EQ(req.schemaless, expect.schemaless);
|
||||
ASSERT_EQ(req.ignoreExist, expect.ignoreExist);
|
||||
ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions);
|
||||
|
|
|
@ -472,6 +472,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
|
|||
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
|
||||
|
||||
extern SSchDebug gSCHDebug;
|
||||
|
||||
|
|
|
@ -763,6 +763,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
|
||||
if (NULL == pReq || pReq->syncReq) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pReq->execFp) {
|
||||
(*pReq->execFp)(NULL, pReq->cbParam, errCode);
|
||||
} else if (pReq->fetchFp) {
|
||||
(*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
|
||||
}
|
||||
}
|
||||
|
||||
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
|
||||
int32_t op = 0;
|
||||
|
@ -801,17 +812,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
|
|||
|
||||
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
|
||||
int32_t code = 0;
|
||||
int8_t status = 0;
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
int8_t status = SCH_GET_JOB_STATUS(pJob);
|
||||
|
||||
switch (type) {
|
||||
case SCH_OP_EXEC:
|
||||
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
|
||||
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
|
||||
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
|
||||
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -822,11 +829,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
|
|||
case SCH_OP_FETCH:
|
||||
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
|
||||
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
|
||||
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
|
||||
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
|
||||
|
||||
|
||||
pJob->userRes.fetchRes = pReq->pFetchRes;
|
||||
pJob->userRes.fetchFp = pReq->fetchFp;
|
||||
pJob->userRes.cbParam = pReq->cbParam;
|
||||
|
||||
pJob->opStatus.syncReq = pReq->syncReq;
|
||||
|
||||
if (!SCH_JOB_NEED_FETCH(pJob)) {
|
||||
|
@ -839,10 +851,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
|
|||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
pJob->userRes.fetchRes = pReq->pFetchRes;
|
||||
pJob->userRes.fetchFp = pReq->fetchFp;
|
||||
pJob->userRes.cbParam = pReq->cbParam;
|
||||
|
||||
break;
|
||||
case SCH_OP_GET_STATUS:
|
||||
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
|
||||
|
@ -855,6 +863,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
|
|||
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
if (schJobNeedToStop(pJob, &status)) {
|
||||
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
|
|||
int32_t code = errCode;
|
||||
|
||||
if (NULL == pJob) {
|
||||
schDirectPostJobRes(pReq, errCode);
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
|||
}
|
||||
|
||||
int code = 0;
|
||||
SSyncLogMeta syncMeta;
|
||||
SWalSyncInfo syncMeta;
|
||||
syncMeta.isWeek = pEntry->isWeak;
|
||||
syncMeta.seqNum = pEntry->seqNum;
|
||||
syncMeta.term = pEntry->term;
|
||||
|
@ -369,7 +369,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
ASSERT(pEntry->index == lastIndex + 1);
|
||||
|
||||
int code = 0;
|
||||
SSyncLogMeta syncMeta;
|
||||
SWalSyncInfo syncMeta;
|
||||
syncMeta.isWeek = pEntry->isWeak;
|
||||
syncMeta.seqNum = pEntry->seqNum;
|
||||
syncMeta.term = pEntry->term;
|
||||
|
|
|
@ -573,8 +573,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|||
return;
|
||||
}
|
||||
if (nread < 0) {
|
||||
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
|
||||
T_REF_VAL_GET(conn));
|
||||
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
|
||||
conn->broken = true;
|
||||
cliHandleExcept(conn);
|
||||
}
|
||||
|
@ -650,7 +649,11 @@ static bool cliHandleNoResp(SCliConn* conn) {
|
|||
return res;
|
||||
}
|
||||
static void cliSendCb(uv_write_t* req, int status) {
|
||||
SCliConn* pConn = req->data;
|
||||
SCliConn* pConn = req && req->handle ? req->handle->data : NULL;
|
||||
taosMemoryFree(req);
|
||||
if (pConn == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (status == 0) {
|
||||
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
|
@ -708,8 +711,8 @@ void cliSend(SCliConn* pConn) {
|
|||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
}
|
||||
|
||||
pConn->writeReq.data = pConn;
|
||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
return;
|
||||
_RETURN:
|
||||
return;
|
||||
|
|
|
@ -265,8 +265,8 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
transMsg.info.refId = pConn->refId;
|
||||
transMsg.info.traceId = pHead->traceId;
|
||||
|
||||
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle,
|
||||
pConn, pConn->refId);
|
||||
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
|
||||
pConn->refId);
|
||||
assert(transMsg.info.handle != NULL);
|
||||
|
||||
if (pHead->noResp == 1) {
|
||||
|
@ -331,7 +331,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
|
|||
}
|
||||
|
||||
void uvOnSendCb(uv_write_t* req, int status) {
|
||||
SSvrConn* conn = req->data;
|
||||
SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
|
||||
taosMemoryFree(req);
|
||||
if (conn == NULL) return;
|
||||
|
||||
if (status == 0) {
|
||||
tTrace("conn %p data already was written on stream", conn);
|
||||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||
|
@ -390,7 +393,6 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->hasEpSet = pMsg->info.hasEpSet;
|
||||
|
||||
|
||||
if (pConn->status == ConnNormal) {
|
||||
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
|
||||
} else {
|
||||
|
@ -433,7 +435,9 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
|
|||
uvPrepareSendData(smsg, &wb);
|
||||
|
||||
transRefSrvHandle(pConn);
|
||||
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
|
||||
|
||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
||||
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
|
||||
}
|
||||
static void uvStartSendResp(SSvrMsg* smsg) {
|
||||
// impl
|
||||
|
|
|
@ -146,12 +146,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
|
|||
|
||||
// seek section
|
||||
int walChangeWrite(SWal* pWal, int64_t ver);
|
||||
int walSetWrite(SWal* pWal);
|
||||
int walInitWriteFile(SWal* pWal);
|
||||
// seek section end
|
||||
|
||||
int64_t walGetSeq();
|
||||
int walSeekWriteVer(SWal* pWal, int64_t ver);
|
||||
int walRoll(SWal* pWal);
|
||||
int32_t walRollImpl(SWal* pWal);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -51,10 +51,10 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
|||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
||||
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(fnameStr, &file_size, NULL);
|
||||
int readSize = TMIN(WAL_MAX_SIZE + 2, file_size);
|
||||
pLastFileInfo->fileSize = file_size;
|
||||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize);
|
||||
pLastFileInfo->fileSize = fileSize;
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
|
@ -145,6 +145,26 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int actualFileNum = taosArrayGetSize(pLogInfoArray);
|
||||
|
||||
#if 0
|
||||
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
if (fileSize == 0) {
|
||||
taosRemoveFile(fnameStr);
|
||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
taosArrayPop(pLogInfoArray);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
actualFileNum = taosArrayGetSize(pLogInfoArray);
|
||||
#endif
|
||||
|
||||
if (metaFileNum > actualFileNum) {
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
|
||||
} else if (metaFileNum < actualFileNum) {
|
||||
|
@ -164,6 +184,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
|
||||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
/*ASSERT(fileSize != 0);*/
|
||||
|
||||
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
|
||||
pLastFileInfo->fileSize = fileSize;
|
||||
|
@ -380,9 +401,9 @@ int walLoadMeta(SWal* pWal) {
|
|||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
// read metafile
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(fnameStr, &file_size, NULL);
|
||||
int size = (int)file_size;
|
||||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
int size = (int)fileSize;
|
||||
char* buf = taosMemoryMalloc(size + 5);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||
|
|
|
@ -48,7 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int walSetWrite(SWal* pWal) {
|
||||
int walInitWriteFile(SWal* pWal) {
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
||||
ASSERT(pRet != NULL);
|
||||
|
@ -70,6 +70,7 @@ int walSetWrite(SWal* pWal) {
|
|||
// switch file
|
||||
pWal->pWriteIdxTFile = pIdxTFile;
|
||||
pWal->pWriteLogTFile = pLogTFile;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -207,12 +207,35 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
/*pWal->vers.firstVer = index;*/
|
||||
if (walRollImpl(pWal) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
|
||||
if (walRollImpl(pWal) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
|
||||
if (walRollImpl(pWal) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
||||
pWal->vers.verInSnapshotting = ver;
|
||||
// check file rolling
|
||||
if (pWal->cfg.retentionPeriod == 0) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
walRoll(pWal);
|
||||
if (walGetLastFileSize(pWal) != 0) {
|
||||
walRollImpl(pWal);
|
||||
}
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
}
|
||||
|
||||
|
@ -282,7 +305,7 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
int walRoll(SWal *pWal) {
|
||||
int32_t walRollImpl(SWal *pWal) {
|
||||
int32_t code = 0;
|
||||
if (pWal->pWriteIdxTFile != NULL) {
|
||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
||||
|
@ -330,11 +353,13 @@ int walRoll(SWal *pWal) {
|
|||
|
||||
pWal->lastRollSeq = walGetSeq();
|
||||
|
||||
walSaveMeta(pWal);
|
||||
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
|
||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||
|
@ -348,61 +373,14 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLogMeta syncMeta, const void *body,
|
||||
int32_t bodyLen) {
|
||||
int32_t code = 0;
|
||||
|
||||
// no wal
|
||||
if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
|
||||
|
||||
if (bodyLen > TSDB_MAX_WAL_SIZE) {
|
||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
return -1;
|
||||
}
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
||||
if (index == pWal->vers.lastVer + 1) {
|
||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
pWal->vers.firstVer = index;
|
||||
if (walRoll(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||
if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
|
||||
if (walRoll(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
} else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
|
||||
if (walRoll(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// reject skip log or rewrite log
|
||||
// must truncate explicitly first
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/
|
||||
|
||||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteLogTFile == NULL) {
|
||||
walSetWrite(pWal);
|
||||
taosLSeekFile(pWal->pWriteLogTFile, 0, SEEK_END);
|
||||
taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
|
||||
}
|
||||
|
||||
pWal->writeHead.head.version = index;
|
||||
// TODO gurantee atomicity by truncate failed writing
|
||||
static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta,
|
||||
const void *body, int32_t bodyLen) {
|
||||
int64_t code = 0;
|
||||
|
||||
int64_t offset = walGetCurFileOffset(pWal);
|
||||
|
||||
pWal->writeHead.head.version = index;
|
||||
pWal->writeHead.head.bodyLen = bodyLen;
|
||||
pWal->writeHead.head.msgType = msgType;
|
||||
|
||||
|
@ -417,7 +395,8 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
|
|||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
|
||||
|
@ -425,13 +404,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
|
|||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
code = walWriteIndex(pWal, index, offset);
|
||||
if (code != 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
if (code < 0) {
|
||||
// TODO ftruncate
|
||||
goto END;
|
||||
}
|
||||
|
||||
// set status
|
||||
|
@ -444,13 +424,88 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
|
|||
walGetCurFileInfo(pWal)->lastVer = index;
|
||||
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalCkHead) + bodyLen;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
||||
return 0;
|
||||
END:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
|
||||
if (bodyLen > TSDB_MAX_WAL_SIZE) {
|
||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
||||
int64_t index = pWal->vers.lastVer + 1;
|
||||
|
||||
if (walCheckAndRoll(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
||||
if (walInitWriteFile(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return index;
|
||||
}
|
||||
|
||||
int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
|
||||
int32_t bodyLen) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (bodyLen > TSDB_MAX_WAL_SIZE) {
|
||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
return -1;
|
||||
}
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
||||
// concurrency control:
|
||||
// if logs are write with assigned index,
|
||||
// smaller index must be write before larger one
|
||||
if (index != pWal->vers.lastVer + 1) {
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walCheckAndRoll(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
||||
if (walInitWriteFile(pWal) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
|
||||
SSyncLogMeta syncMeta = {
|
||||
SWalSyncInfo syncMeta = {
|
||||
.isWeek = -1,
|
||||
.seqNum = UINT64_MAX,
|
||||
.term = UINT64_MAX,
|
||||
|
|
|
@ -106,8 +106,8 @@ int32_t taosMkDir(const char *dirname) {
|
|||
|
||||
int32_t taosMulMkDir(const char *dirname) {
|
||||
if (dirname == NULL) return -1;
|
||||
char temp[1024];
|
||||
char * pos = temp;
|
||||
char temp[1024];
|
||||
char *pos = temp;
|
||||
int32_t code = 0;
|
||||
#ifdef WINDOWS
|
||||
taosRealPath(dirname, temp, sizeof(temp));
|
||||
|
@ -127,11 +127,11 @@ int32_t taosMulMkDir(const char *dirname) {
|
|||
for (; *pos != '\0'; pos++) {
|
||||
if (*pos == TD_DIRSEP[0]) {
|
||||
*pos = '\0';
|
||||
#ifdef WINDOWS
|
||||
#ifdef WINDOWS
|
||||
code = _mkdir(temp, 0755);
|
||||
#else
|
||||
#else
|
||||
code = mkdir(temp, 0755);
|
||||
#endif
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
return code;
|
||||
}
|
||||
|
@ -140,11 +140,11 @@ int32_t taosMulMkDir(const char *dirname) {
|
|||
}
|
||||
|
||||
if (*(pos - 1) != TD_DIRSEP[0]) {
|
||||
#ifdef WINDOWS
|
||||
#ifdef WINDOWS
|
||||
code = _mkdir(temp, 0755);
|
||||
#else
|
||||
#else
|
||||
code = mkdir(temp, 0755);
|
||||
#endif
|
||||
#endif
|
||||
if (code < 0 && errno != EEXIST) {
|
||||
return code;
|
||||
}
|
||||
|
@ -267,7 +267,7 @@ char *taosDirName(char *name) {
|
|||
} else {
|
||||
name[0] = 0;
|
||||
}
|
||||
return name;
|
||||
return name;
|
||||
#else
|
||||
return dirname(name);
|
||||
#endif
|
||||
|
@ -334,9 +334,9 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) {
|
|||
}
|
||||
|
||||
char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) {
|
||||
if (pDirEntry == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
/*if (pDirEntry == NULL) {*/
|
||||
/*return NULL;*/
|
||||
/*}*/
|
||||
#ifdef WINDOWS
|
||||
return pDirEntry->findFileData.cFileName;
|
||||
#else
|
||||
|
|
|
@ -47,7 +47,7 @@ sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
|||
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
|
||||
print =============== step6: select data
|
||||
#sql select * from ct1
|
||||
sql select * from ct1
|
||||
#sql select * from stb
|
||||
|
||||
_OVER:
|
||||
|
|
|
@ -37,10 +37,10 @@ endi
|
|||
|
||||
print =============== step3: create show table
|
||||
sql create table ct1 using stb tags(1000)
|
||||
#sql show tables
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
sql show tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step5: insert data
|
||||
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
|
|
|
@ -96,7 +96,7 @@ if $rows != 1 then
|
|||
endi
|
||||
|
||||
sql select * from information_schema.user_tables
|
||||
if $rows != 31 then
|
||||
if $rows != 30 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -72,6 +72,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
|
|||
tjsonAddIntegerToObject(item, "buffer", pObj->cfg.buffer);
|
||||
tjsonAddIntegerToObject(item, "pageSize", pObj->cfg.pageSize);
|
||||
tjsonAddIntegerToObject(item, "pages", pObj->cfg.pages);
|
||||
tjsonAddIntegerToObject(item, "cacheLastSize", pObj->cfg.cacheLastSize);
|
||||
tjsonAddIntegerToObject(item, "daysPerFile", pObj->cfg.daysPerFile);
|
||||
tjsonAddIntegerToObject(item, "daysToKeep0", pObj->cfg.daysToKeep0);
|
||||
tjsonAddIntegerToObject(item, "daysToKeep1", pObj->cfg.daysToKeep1);
|
||||
|
@ -84,7 +85,7 @@ void dumpDb(SSdb *pSdb, SJson *json) {
|
|||
tjsonAddIntegerToObject(item, "compression", pObj->cfg.compression);
|
||||
tjsonAddIntegerToObject(item, "replications", pObj->cfg.replications);
|
||||
tjsonAddIntegerToObject(item, "strict", pObj->cfg.strict);
|
||||
tjsonAddIntegerToObject(item, "cacheLastRow", pObj->cfg.cacheLastRow);
|
||||
tjsonAddIntegerToObject(item, "cacheLast", pObj->cfg.cacheLast);
|
||||
tjsonAddIntegerToObject(item, "hashMethod", pObj->cfg.hashMethod);
|
||||
tjsonAddIntegerToObject(item, "numOfRetensions", pObj->cfg.numOfRetensions);
|
||||
tjsonAddIntegerToObject(item, "schemaless", pObj->cfg.schemaless);
|
||||
|
|
Loading…
Reference in New Issue