Merge branch 'develop' into hotfix/test
This commit is contained in:
commit
f7e89d56b6
|
@ -198,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
|
|||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
// user-defined callback function is stored in fetchFp
|
||||
pSql->fetchFp = fp;
|
||||
pSql->fp = tscAsyncFetchRowsProxy;
|
||||
|
||||
if (pRes->qhandle == 0) {
|
||||
tscError("qhandle is NULL");
|
||||
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
|
||||
|
@ -205,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
|
|||
return;
|
||||
}
|
||||
|
||||
// user-defined callback function is stored in fetchFp
|
||||
pSql->fetchFp = fp;
|
||||
pSql->fp = tscAsyncFetchRowsProxy;
|
||||
|
||||
pSql->param = param;
|
||||
tscResetForNextRetrieve(pRes);
|
||||
|
||||
|
|
|
@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
|
|||
pMsg->commitTime = htonl(pCreateDb->commitTime);
|
||||
pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock);
|
||||
pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock);
|
||||
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
||||
pMsg->compression = pCreateDb->compressionLevel;
|
||||
pMsg->walLevel = (char)pCreateDb->walLevel;
|
||||
pMsg->replications = pCreateDb->replica;
|
||||
|
@ -5529,6 +5530,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
val = htonl(pCreate->fsyncPeriod);
|
||||
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val,
|
||||
TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
if (pCreate->compression != -1 &&
|
||||
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
|
||||
|
|
|
@ -80,6 +80,7 @@ extern int16_t tsCommitTime; // seconds
|
|||
extern int32_t tsTimePrecision;
|
||||
extern int16_t tsCompression;
|
||||
extern int16_t tsWAL;
|
||||
extern int32_t tsFsyncPeriod;
|
||||
extern int32_t tsReplications;
|
||||
|
||||
// balance
|
||||
|
|
|
@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
|
|||
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
|
||||
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
|
||||
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
|
||||
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
|
||||
int32_t tsMaxVgroupsPerDb = 0;
|
||||
int32_t tsMinTablePerVnode = 100;
|
||||
|
@ -715,6 +716,16 @@ static void doInitGlobalConfig() {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "fsync";
|
||||
cfg.ptr = &tsFsyncPeriod;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = TSDB_MIN_FSYNC_PERIOD;
|
||||
cfg.maxValue = TSDB_MAX_FSYNC_PERIOD;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "replica";
|
||||
cfg.ptr = &tsReplications;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
|
|
|
@ -401,6 +401,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
|
||||
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
|
||||
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
||||
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
|
||||
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||
|
||||
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||
|
|
|
@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_MAX_WAL_LEVEL 2
|
||||
#define TSDB_DEFAULT_WAL_LEVEL 1
|
||||
|
||||
#define TSDB_MIN_FSYNC_PERIOD 0
|
||||
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
|
||||
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
|
||||
|
||||
#define TSDB_MIN_DB_REPLICA_OPTION 1
|
||||
#define TSDB_MAX_DB_REPLICA_OPTION 3
|
||||
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
|
||||
|
|
|
@ -515,6 +515,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
uint8_t precision; // time resolution
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
@ -608,6 +609,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
|
|
@ -110,117 +110,117 @@
|
|||
#define TK_BLOCKS 92
|
||||
#define TK_CTIME 93
|
||||
#define TK_WAL 94
|
||||
#define TK_COMP 95
|
||||
#define TK_PRECISION 96
|
||||
#define TK_LP 97
|
||||
#define TK_RP 98
|
||||
#define TK_TAGS 99
|
||||
#define TK_USING 100
|
||||
#define TK_AS 101
|
||||
#define TK_COMMA 102
|
||||
#define TK_NULL 103
|
||||
#define TK_SELECT 104
|
||||
#define TK_UNION 105
|
||||
#define TK_ALL 106
|
||||
#define TK_FROM 107
|
||||
#define TK_VARIABLE 108
|
||||
#define TK_INTERVAL 109
|
||||
#define TK_FILL 110
|
||||
#define TK_SLIDING 111
|
||||
#define TK_ORDER 112
|
||||
#define TK_BY 113
|
||||
#define TK_ASC 114
|
||||
#define TK_DESC 115
|
||||
#define TK_GROUP 116
|
||||
#define TK_HAVING 117
|
||||
#define TK_LIMIT 118
|
||||
#define TK_OFFSET 119
|
||||
#define TK_SLIMIT 120
|
||||
#define TK_SOFFSET 121
|
||||
#define TK_WHERE 122
|
||||
#define TK_NOW 123
|
||||
#define TK_RESET 124
|
||||
#define TK_QUERY 125
|
||||
#define TK_ADD 126
|
||||
#define TK_COLUMN 127
|
||||
#define TK_TAG 128
|
||||
#define TK_CHANGE 129
|
||||
#define TK_SET 130
|
||||
#define TK_KILL 131
|
||||
#define TK_CONNECTION 132
|
||||
#define TK_STREAM 133
|
||||
#define TK_COLON 134
|
||||
#define TK_ABORT 135
|
||||
#define TK_AFTER 136
|
||||
#define TK_ATTACH 137
|
||||
#define TK_BEFORE 138
|
||||
#define TK_BEGIN 139
|
||||
#define TK_CASCADE 140
|
||||
#define TK_CLUSTER 141
|
||||
#define TK_CONFLICT 142
|
||||
#define TK_COPY 143
|
||||
#define TK_DEFERRED 144
|
||||
#define TK_DELIMITERS 145
|
||||
#define TK_DETACH 146
|
||||
#define TK_EACH 147
|
||||
#define TK_END 148
|
||||
#define TK_EXPLAIN 149
|
||||
#define TK_FAIL 150
|
||||
#define TK_FOR 151
|
||||
#define TK_IGNORE 152
|
||||
#define TK_IMMEDIATE 153
|
||||
#define TK_INITIALLY 154
|
||||
#define TK_INSTEAD 155
|
||||
#define TK_MATCH 156
|
||||
#define TK_KEY 157
|
||||
#define TK_OF 158
|
||||
#define TK_RAISE 159
|
||||
#define TK_REPLACE 160
|
||||
#define TK_RESTRICT 161
|
||||
#define TK_ROW 162
|
||||
#define TK_STATEMENT 163
|
||||
#define TK_TRIGGER 164
|
||||
#define TK_VIEW 165
|
||||
#define TK_COUNT 166
|
||||
#define TK_SUM 167
|
||||
#define TK_AVG 168
|
||||
#define TK_MIN 169
|
||||
#define TK_MAX 170
|
||||
#define TK_FIRST 171
|
||||
#define TK_LAST 172
|
||||
#define TK_TOP 173
|
||||
#define TK_BOTTOM 174
|
||||
#define TK_STDDEV 175
|
||||
#define TK_PERCENTILE 176
|
||||
#define TK_APERCENTILE 177
|
||||
#define TK_LEASTSQUARES 178
|
||||
#define TK_HISTOGRAM 179
|
||||
#define TK_DIFF 180
|
||||
#define TK_SPREAD 181
|
||||
#define TK_TWA 182
|
||||
#define TK_INTERP 183
|
||||
#define TK_LAST_ROW 184
|
||||
#define TK_RATE 185
|
||||
#define TK_IRATE 186
|
||||
#define TK_SUM_RATE 187
|
||||
#define TK_SUM_IRATE 188
|
||||
#define TK_AVG_RATE 189
|
||||
#define TK_AVG_IRATE 190
|
||||
#define TK_TBID 191
|
||||
#define TK_SEMI 192
|
||||
#define TK_NONE 193
|
||||
#define TK_PREV 194
|
||||
#define TK_LINEAR 195
|
||||
#define TK_IMPORT 196
|
||||
#define TK_METRIC 197
|
||||
#define TK_TBNAME 198
|
||||
#define TK_JOIN 199
|
||||
#define TK_METRICS 200
|
||||
#define TK_STABLE 201
|
||||
#define TK_INSERT 202
|
||||
#define TK_INTO 203
|
||||
#define TK_VALUES 204
|
||||
|
||||
#define TK_FSYNC 95
|
||||
#define TK_COMP 96
|
||||
#define TK_PRECISION 97
|
||||
#define TK_LP 98
|
||||
#define TK_RP 99
|
||||
#define TK_TAGS 100
|
||||
#define TK_USING 101
|
||||
#define TK_AS 102
|
||||
#define TK_COMMA 103
|
||||
#define TK_NULL 104
|
||||
#define TK_SELECT 105
|
||||
#define TK_UNION 106
|
||||
#define TK_ALL 107
|
||||
#define TK_FROM 108
|
||||
#define TK_VARIABLE 109
|
||||
#define TK_INTERVAL 110
|
||||
#define TK_FILL 111
|
||||
#define TK_SLIDING 112
|
||||
#define TK_ORDER 113
|
||||
#define TK_BY 114
|
||||
#define TK_ASC 115
|
||||
#define TK_DESC 116
|
||||
#define TK_GROUP 117
|
||||
#define TK_HAVING 118
|
||||
#define TK_LIMIT 119
|
||||
#define TK_OFFSET 120
|
||||
#define TK_SLIMIT 121
|
||||
#define TK_SOFFSET 122
|
||||
#define TK_WHERE 123
|
||||
#define TK_NOW 124
|
||||
#define TK_RESET 125
|
||||
#define TK_QUERY 126
|
||||
#define TK_ADD 127
|
||||
#define TK_COLUMN 128
|
||||
#define TK_TAG 129
|
||||
#define TK_CHANGE 130
|
||||
#define TK_SET 131
|
||||
#define TK_KILL 132
|
||||
#define TK_CONNECTION 133
|
||||
#define TK_STREAM 134
|
||||
#define TK_COLON 135
|
||||
#define TK_ABORT 136
|
||||
#define TK_AFTER 137
|
||||
#define TK_ATTACH 138
|
||||
#define TK_BEFORE 139
|
||||
#define TK_BEGIN 140
|
||||
#define TK_CASCADE 141
|
||||
#define TK_CLUSTER 142
|
||||
#define TK_CONFLICT 143
|
||||
#define TK_COPY 144
|
||||
#define TK_DEFERRED 145
|
||||
#define TK_DELIMITERS 146
|
||||
#define TK_DETACH 147
|
||||
#define TK_EACH 148
|
||||
#define TK_END 149
|
||||
#define TK_EXPLAIN 150
|
||||
#define TK_FAIL 151
|
||||
#define TK_FOR 152
|
||||
#define TK_IGNORE 153
|
||||
#define TK_IMMEDIATE 154
|
||||
#define TK_INITIALLY 155
|
||||
#define TK_INSTEAD 156
|
||||
#define TK_MATCH 157
|
||||
#define TK_KEY 158
|
||||
#define TK_OF 159
|
||||
#define TK_RAISE 160
|
||||
#define TK_REPLACE 161
|
||||
#define TK_RESTRICT 162
|
||||
#define TK_ROW 163
|
||||
#define TK_STATEMENT 164
|
||||
#define TK_TRIGGER 165
|
||||
#define TK_VIEW 166
|
||||
#define TK_COUNT 167
|
||||
#define TK_SUM 168
|
||||
#define TK_AVG 169
|
||||
#define TK_MIN 170
|
||||
#define TK_MAX 171
|
||||
#define TK_FIRST 172
|
||||
#define TK_LAST 173
|
||||
#define TK_TOP 174
|
||||
#define TK_BOTTOM 175
|
||||
#define TK_STDDEV 176
|
||||
#define TK_PERCENTILE 177
|
||||
#define TK_APERCENTILE 178
|
||||
#define TK_LEASTSQUARES 179
|
||||
#define TK_HISTOGRAM 180
|
||||
#define TK_DIFF 181
|
||||
#define TK_SPREAD 182
|
||||
#define TK_TWA 183
|
||||
#define TK_INTERP 184
|
||||
#define TK_LAST_ROW 185
|
||||
#define TK_RATE 186
|
||||
#define TK_IRATE 187
|
||||
#define TK_SUM_RATE 188
|
||||
#define TK_SUM_IRATE 189
|
||||
#define TK_AVG_RATE 190
|
||||
#define TK_AVG_IRATE 191
|
||||
#define TK_TBID 192
|
||||
#define TK_SEMI 193
|
||||
#define TK_NONE 194
|
||||
#define TK_PREV 195
|
||||
#define TK_LINEAR 196
|
||||
#define TK_IMPORT 197
|
||||
#define TK_METRIC 198
|
||||
#define TK_TBNAME 199
|
||||
#define TK_JOIN 200
|
||||
#define TK_METRICS 201
|
||||
#define TK_STABLE 202
|
||||
#define TK_INSERT 203
|
||||
#define TK_INTO 204
|
||||
#define TK_VALUES 205
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_COMMENT 301
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int8_t walLevel; // wal level
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
int8_t wals; // number of WAL files;
|
||||
int8_t keep; // keep the wal file when closed
|
||||
} SWalCfg;
|
||||
|
|
|
@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
|
||||
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnode->walCfg.fsyncPeriod);
|
||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals);
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum);
|
||||
|
@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
|
||||
|
||||
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
|
||||
if (!fsyncPeriod || fsyncPeriod->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, fsyncPeriod not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
|
||||
|
||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||
if (!wals || wals->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId);
|
||||
|
|
|
@ -160,6 +160,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
|
|
@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
|
|||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
|
||||
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
|
||||
TSDB_MAX_DB_REPLICA_OPTION);
|
||||
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) {
|
||||
mError("invalid db option fsyncPeriod:%d, valid range: [%d, %d]", pCfg->fsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
|
||||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL) {
|
||||
mError("invalid db option walLevel:%d must be greater than 0", pCfg->walLevel);
|
||||
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
|
||||
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
|
||||
TSDB_MAX_DB_REPLICA_OPTION);
|
||||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
|
@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep;
|
||||
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock;
|
||||
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock;
|
||||
if (pCfg->fsyncPeriod <0) pCfg->fsyncPeriod = tsFsyncPeriod;
|
||||
if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime;
|
||||
if (pCfg->precision < 0) pCfg->precision = tsTimePrecision;
|
||||
if (pCfg->compression < 0) pCfg->compression = tsCompression;
|
||||
|
@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
|
|||
.daysToKeep2 = pCreate->daysToKeep2,
|
||||
.minRowsPerFileBlock = pCreate->minRowsPerFileBlock,
|
||||
.maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock,
|
||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||
.commitTime = pCreate->commitTime,
|
||||
.precision = pCreate->precision,
|
||||
.compression = pCreate->compression,
|
||||
|
@ -559,6 +561,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "fsync");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "comp");
|
||||
|
@ -682,6 +690,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
|
|||
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||
cols++;
|
||||
|
@ -758,6 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
|
|||
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
||||
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||
pCreate->commitTime = htonl(pCreate->commitTime);
|
||||
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
|
||||
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
|
||||
|
||||
|
@ -785,6 +798,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
int32_t minRows = htonl(pAlter->minRowsPerFileBlock);
|
||||
int32_t maxRows = htonl(pAlter->maxRowsPerFileBlock);
|
||||
int32_t commitTime = htonl(pAlter->commitTime);
|
||||
int32_t fsyncPeriod = htonl(pAlter->fsyncPeriod);
|
||||
int8_t compression = pAlter->compression;
|
||||
int8_t walLevel = pAlter->walLevel;
|
||||
int8_t replications = pAlter->replications;
|
||||
|
@ -861,6 +875,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (fsyncPeriod >= 0 && fsyncPeriod != pDb->cfg.fsyncPeriod) {
|
||||
mError("db:%s, can't alter fsyncPeriod option", pDb->name);
|
||||
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (replications > 0 && replications != pDb->cfg.replications) {
|
||||
mDebug("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
|
||||
newCfg.replications = replications;
|
||||
|
|
|
@ -115,7 +115,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
|
|||
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
|
||||
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
|
||||
if (pConn == NULL) {
|
||||
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
|
||||
mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
|
|||
}
|
||||
|
||||
static int32_t sdbInitWal() {
|
||||
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1};
|
||||
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
|
||||
char temp[TSDB_FILENAME_LEN];
|
||||
sprintf(temp, "%s/wal", tsMnodeDir);
|
||||
tsSdbObj.wal = walOpen(temp, &walCfg);
|
||||
|
|
|
@ -757,6 +757,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
|||
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
|
||||
pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock);
|
||||
pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock);
|
||||
pCfg->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod);
|
||||
pCfg->commitTime = htonl(pDb->cfg.commitTime);
|
||||
pCfg->precision = pDb->cfg.precision;
|
||||
pCfg->compression = pDb->cfg.compression;
|
||||
|
|
|
@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) {
|
|||
assert(refCount >= 0);
|
||||
|
||||
HttpContext **ppContext = pContext->ppContext;
|
||||
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount);
|
||||
httpDebug("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
|
||||
|
||||
if (tsHttpServer.contextCache != NULL) {
|
||||
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
|
||||
|
|
|
@ -47,6 +47,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
|
|||
}
|
||||
}
|
||||
|
||||
// if (tscResultsetFetchCompleted(result)) {
|
||||
// isContinue = false;
|
||||
// }
|
||||
|
||||
if (isContinue) {
|
||||
// retrieve next batch of rows
|
||||
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s",
|
||||
|
@ -75,7 +79,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
|
|||
HttpContext *pContext = (HttpContext *)param;
|
||||
if (pContext == NULL) return;
|
||||
|
||||
HttpSqlCmds * multiCmds = pContext->multiCmds;
|
||||
code = taos_errno(result);
|
||||
HttpSqlCmds *multiCmds = pContext->multiCmds;
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
||||
HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos;
|
||||
|
@ -109,8 +114,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
|
|||
return;
|
||||
}
|
||||
|
||||
int num_fields = taos_field_count(result);
|
||||
if (num_fields == 0) {
|
||||
bool isUpdate = tscIsUpdateQuery(result);
|
||||
if (isUpdate) {
|
||||
// not select or show commands
|
||||
int affectRows = taos_affected_rows(result);
|
||||
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s",
|
||||
|
@ -221,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
|
|||
if (numOfRows < 0) {
|
||||
httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr,
|
||||
pContext->user, tstrerror(numOfRows));
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
|
||||
if (encode->stopJsonFp) {
|
||||
(encode->stopJsonFp)(pContext, &pContext->singleCmd);
|
||||
|
@ -238,6 +243,7 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode)
|
|||
if (pContext == NULL) return;
|
||||
|
||||
int32_t code = taos_errno(result);
|
||||
|
||||
HttpEncodeMethod *encode = pContext->encodeMethod;
|
||||
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -116,6 +116,7 @@ typedef struct SCreateDBInfo {
|
|||
int32_t daysPerFile;
|
||||
int32_t minRowsPerBlock;
|
||||
int32_t maxRowsPerBlock;
|
||||
int32_t fsyncPeriod;
|
||||
int64_t commitTime;
|
||||
int32_t walLevel;
|
||||
int32_t compressionLevel;
|
||||
|
|
|
@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; }
|
|||
blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; }
|
||||
ctime(Y) ::= CTIME INTEGER(X). { Y = X; }
|
||||
wal(Y) ::= WAL INTEGER(X). { Y = X; }
|
||||
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
|
||||
comp(Y) ::= COMP INTEGER(X). { Y = X; }
|
||||
prec(Y) ::= PRECISION STRING(X). { Y = X; }
|
||||
|
||||
|
@ -236,6 +237,7 @@ db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod
|
|||
db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
|
||||
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
||||
|
@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
|||
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); }
|
||||
|
||||
%type typename {TAOS_FIELD}
|
||||
typename(A) ::= ids(X). {
|
||||
|
|
|
@ -6333,6 +6333,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
|
|||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
sem_post(&pQInfo->dataReady);
|
||||
setQueryKilled(pQInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6545,13 +6546,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
|||
|
||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||
if (pQueryMgmt->qinfoPool == NULL) {
|
||||
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pQueryMgmt->lock);
|
||||
if (pQueryMgmt->closed) {
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
|
||||
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
|
||||
return NULL;
|
||||
} else {
|
||||
uint64_t handleVal = (uint64_t) qInfo;
|
||||
|
|
|
@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
|
|||
pDBInfo->compressionLevel = -1;
|
||||
|
||||
pDBInfo->walLevel = -1;
|
||||
pDBInfo->fsyncPeriod = -1;
|
||||
pDBInfo->commitTime = -1;
|
||||
pDBInfo->maxTablesPerVnode = -1;
|
||||
|
||||
|
|
|
@ -124,6 +124,7 @@ static SKeyword keywordTable[] = {
|
|||
{"CACHE", TK_CACHE},
|
||||
{"CTIME", TK_CTIME},
|
||||
{"WAL", TK_WAL},
|
||||
{"FSYNC", TK_FSYNC},
|
||||
{"COMP", TK_COMP},
|
||||
{"PRECISION", TK_PRECISION},
|
||||
{"LP", TK_LP},
|
||||
|
|
2253
src/query/src/sql.c
2253
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
|
@ -96,6 +96,11 @@ typedef struct {
|
|||
} STsdbBufPool;
|
||||
|
||||
// ------------------ tsdbMemTable.c
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
typedef struct {
|
||||
uint64_t uid;
|
||||
TSKEY keyFirst;
|
||||
|
@ -206,10 +211,10 @@ typedef struct {
|
|||
int64_t offset : 63;
|
||||
int32_t algorithm : 8;
|
||||
int32_t numOfRows : 24;
|
||||
int32_t sversion;
|
||||
int32_t len;
|
||||
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols
|
||||
int16_t numOfSubBlocks;
|
||||
int16_t numOfCols;
|
||||
int16_t numOfCols; // not including timestamp column
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SCompBlock;
|
||||
|
@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
|||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
TSKEY* filterKeys, int nFilterKeys);
|
||||
|
||||
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||
if (pIter == NULL) return NULL;
|
||||
|
||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) return NULL;
|
||||
|
||||
return SL_GET_NODE_DATA(node);
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) return -1;
|
||||
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
// ------------------ tsdbFile.c
|
||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||
|
@ -421,25 +444,36 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
|||
#define helperType(h) (h)->type
|
||||
#define helperRepo(h) (h)->pRepo
|
||||
#define helperState(h) (h)->state
|
||||
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
||||
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
|
||||
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
|
||||
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
|
||||
return 1;
|
||||
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
#define REPO_ID(r) (r)->config.tsdbId
|
||||
|
|
|
@ -18,11 +18,6 @@
|
|||
|
||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
|
@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data);
|
|||
static void * tsdbCommitData(void *arg);
|
||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey,
|
||||
int maxRowsToRead, SDataCols *pCols);
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
|
@ -252,6 +244,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||
TSKEY *filterKeys, int nFilterKeys) {
|
||||
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
int numOfRows = 0;
|
||||
TSKEY keyNext = 0;
|
||||
int filterIter = 0;
|
||||
|
||||
if (nFilterKeys != 0) { // for filter purpose
|
||||
ASSERT(filterKeys != NULL);
|
||||
keyNext = tsdbNextIterKey(pIter);
|
||||
if (keyNext < 0 || keyNext > maxKey) return numOfRows;
|
||||
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
|
||||
filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY));
|
||||
}
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) break;
|
||||
|
||||
keyNext = dataRowKey(row);
|
||||
if (keyNext < 0 || keyNext > maxKey) break;
|
||||
|
||||
bool keyFiltered = false;
|
||||
if (nFilterKeys != 0) {
|
||||
while (true) {
|
||||
if (filterIter >= nFilterKeys) break;
|
||||
if (keyNext == filterKeys[filterIter]) {
|
||||
keyFiltered = true;
|
||||
filterIter++;
|
||||
break;
|
||||
} else if (keyNext < filterKeys[filterIter]) {
|
||||
break;
|
||||
} else {
|
||||
filterIter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!keyFiltered) {
|
||||
if (pCols) {
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
}
|
||||
numOfRows++;
|
||||
}
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL);
|
||||
|
@ -378,7 +430,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
// Create the iterator to read from cache
|
||||
if (pMem->numOfRows > 0) {
|
||||
iters = tsdbCreateTableIters(pRepo);
|
||||
iters = tsdbCreateCommitIters(pRepo);
|
||||
if (iters == NULL) {
|
||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
|
@ -418,7 +470,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
@ -479,19 +531,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
|
|||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
||||
}
|
||||
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter) {
|
||||
if (pIter == NULL) return -1;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter->pIter);
|
||||
if (node == NULL) return -1;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||
for (int i = 0; i < nIters; i++) {
|
||||
TSKEY nextKey = tsdbNextIterKey(iters + i);
|
||||
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
|
||||
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -504,7 +546,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
|
|||
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||
char * dataDir = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
|
@ -549,33 +590,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
if (pIter->pIter != NULL) {
|
||||
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
||||
|
||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||
int nLoop = 0;
|
||||
while (true) {
|
||||
int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
ASSERT(rowsRead >= 0);
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
nLoop++;
|
||||
|
||||
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
|
||||
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
|
||||
|
||||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
||||
ASSERT(rowsWritten != 0);
|
||||
if (rowsWritten < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(rowsWritten <= pDataCols->numOfRows);
|
||||
|
||||
tdPopDataColsPoints(pDataCols, rowsWritten);
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
|
||||
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pDataCols->numOfRows == 0);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
|
@ -615,7 +636,7 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
@ -645,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (!tSkipListIterNext(iters[i].pIter)) {
|
||||
terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM;
|
||||
goto _err;
|
||||
}
|
||||
tSkipListIterNext(iters[i].pIter);
|
||||
}
|
||||
}
|
||||
|
||||
return iters;
|
||||
|
||||
_err:
|
||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
||||
if (iters == NULL) return;
|
||||
|
||||
for (int i = 1; i < maxTables; i++) {
|
||||
|
@ -670,35 +688,4 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
|||
}
|
||||
|
||||
free(iters);
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||
ASSERT(maxRowsToRead > 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
|
||||
int numOfRows = 0;
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) break;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
// TODO: deal with the error here
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
numOfRows++;
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
|
@ -727,7 +727,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
|||
|
||||
T_REF_INC(pTable);
|
||||
|
||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
TABLE_UID(pTable));
|
||||
|
||||
return pTable;
|
||||
|
@ -740,7 +740,7 @@ _err:
|
|||
static void tsdbFreeTable(STable *pTable) {
|
||||
if (pTable) {
|
||||
if (pTable->name != NULL)
|
||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
tsdbTrace("table %s tid %d uid %" PRIu64 " is freed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
TABLE_UID(pTable));
|
||||
tfree(TABLE_NAME(pTable));
|
||||
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -294,7 +294,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
|||
}
|
||||
} else { // old data exists, update the node
|
||||
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
|
||||
uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data);
|
||||
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld);
|
||||
}
|
||||
|
||||
__cache_unlock(pCacheObj);
|
||||
|
@ -307,26 +307,30 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *pData = NULL;
|
||||
|
||||
__cache_rd_lock(pCacheObj);
|
||||
|
||||
|
||||
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
|
||||
|
||||
int32_t ref = 0;
|
||||
if (ptNode != NULL) {
|
||||
ref = T_REF_INC(*ptNode);
|
||||
pData = (*ptNode)->data;
|
||||
}
|
||||
|
||||
__cache_unlock(pCacheObj);
|
||||
|
||||
if (ptNode != NULL) {
|
||||
|
||||
if (pData != NULL) {
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
||||
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref);
|
||||
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref);
|
||||
} else {
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
||||
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
|
||||
}
|
||||
|
||||
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
|
||||
return (ptNode != NULL) ? (*ptNode)->data : NULL;
|
||||
return pData;
|
||||
}
|
||||
|
||||
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
|
||||
|
@ -453,21 +457,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
} else {
|
||||
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
|
||||
|
||||
__cache_wr_lock(pCacheObj);
|
||||
|
||||
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
|
||||
int32_t ref = T_REF_DEC(pNode);
|
||||
|
||||
if (inTrashCan) {
|
||||
if (inTrashCan && (ref == 0)) {
|
||||
// Remove it if the ref count is 0.
|
||||
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
|
||||
// the node is in trashcan.
|
||||
if (ref == 0) {
|
||||
__cache_wr_lock(pCacheObj);
|
||||
assert(pNode->pTNodeHeader->pData == pNode);
|
||||
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
|
||||
__cache_unlock(pCacheObj);
|
||||
}
|
||||
|
||||
assert(pNode->pTNodeHeader->pData == pNode);
|
||||
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
|
||||
}
|
||||
|
||||
__cache_unlock(pCacheObj);
|
||||
}
|
||||
|
||||
// else {
|
||||
|
|
|
@ -69,6 +69,7 @@ int32_t vnodeInitResources() {
|
|||
}
|
||||
|
||||
void vnodeCleanupResources() {
|
||||
|
||||
if (tsDnodeVnodesHash != NULL) {
|
||||
taosHashCleanup(tsDnodeVnodesHash);
|
||||
tsDnodeVnodesHash = NULL;
|
||||
|
@ -137,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
return TSDB_CODE_VND_INIT_FAILED;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel);
|
||||
vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
|
||||
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
||||
|
||||
return code;
|
||||
|
@ -618,6 +619,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
||||
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnodeCfg->cfg.fsyncPeriod);
|
||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
|
||||
|
@ -782,6 +784,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
|
||||
|
||||
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
|
||||
if (!walLevel || walLevel->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
|
||||
|
||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||
if (!wals || wals->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId);
|
||||
|
|
|
@ -108,9 +108,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
if (code == TSDB_CODE_SUCCESS) {
|
||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
||||
if (handle == NULL) { // failed to register qhandle
|
||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||
tstrerror(pRsp->code));
|
||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void*) pQInfo, tstrerror(pRsp->code));
|
||||
} else {
|
||||
assert(*handle == pQInfo);
|
||||
pRsp->qhandle = htobe64((uint64_t) pQInfo);
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "tlog.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tutil.h"
|
||||
#include "ttimer.h"
|
||||
#include "taoserror.h"
|
||||
#include "twal.h"
|
||||
#include "tqueue.h"
|
||||
|
@ -44,6 +45,9 @@ typedef struct {
|
|||
int fd;
|
||||
int keep;
|
||||
int level;
|
||||
int32_t fsyncPeriod;
|
||||
void *timer;
|
||||
void *signature;
|
||||
int max; // maximum number of wal files
|
||||
uint32_t id; // increase continuously
|
||||
int num; // number of wal files
|
||||
|
@ -52,10 +56,23 @@ typedef struct {
|
|||
pthread_mutex_t mutex;
|
||||
} SWal;
|
||||
|
||||
static void *walTmrCtrl = NULL;
|
||||
static int tsWalNum = 0;
|
||||
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
|
||||
static uint32_t walSignature = 0xFAFBFDFE;
|
||||
static int walHandleExistingFiles(const char *path);
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
|
||||
static int walRemoveWalFiles(const char *path);
|
||||
static int walHandleExistingFiles(const char *path);
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
|
||||
static int walRemoveWalFiles(const char *path);
|
||||
static void walProcessFsyncTimer(void *param, void *tmrId);
|
||||
static void walRelease(SWal *pWal);
|
||||
|
||||
static void walModuleInitFunc() {
|
||||
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
|
||||
if (walTmrCtrl == NULL)
|
||||
walModuleInit = PTHREAD_ONCE_INIT;
|
||||
else
|
||||
wDebug("WAL module is initialized");
|
||||
}
|
||||
|
||||
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||
|
@ -64,20 +81,38 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pthread_once(&walModuleInit, walModuleInitFunc);
|
||||
if (walTmrCtrl == NULL) {
|
||||
free(pWal);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&tsWalNum, 1);
|
||||
pWal->fd = -1;
|
||||
pWal->max = pCfg->wals;
|
||||
pWal->id = 0;
|
||||
pWal->num = 0;
|
||||
pWal->level = pCfg->walLevel;
|
||||
pWal->keep = pCfg->keep;
|
||||
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
||||
pWal->signature = pWal;
|
||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||
pthread_mutex_init(&pWal->mutex, NULL);
|
||||
|
||||
if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
|
||||
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
|
||||
if (pWal->timer == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
walRelease(pWal);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (tmkdir(path, 0755) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
|
@ -89,12 +124,11 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
|||
if (pWal && pWal->fd <0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("wal:%s, failed to open(%s)", path, strerror(errno));
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
if (pWal) wDebug("wal:%s, it is open, level:%d", path, pWal->level);
|
||||
if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
|
||||
return pWal;
|
||||
}
|
||||
|
||||
|
@ -102,7 +136,8 @@ void walClose(void *handle) {
|
|||
if (handle == NULL) return;
|
||||
|
||||
SWal *pWal = handle;
|
||||
close(pWal->fd);
|
||||
tclose(pWal->fd);
|
||||
if (pWal->timer) taosTmrStopA(&pWal->timer);
|
||||
|
||||
if (pWal->keep == 0) {
|
||||
// remove all files in the directory
|
||||
|
@ -118,9 +153,7 @@ void walClose(void *handle) {
|
|||
wDebug("wal:%s, it is closed and kept", pWal->name);
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
}
|
||||
|
||||
int walRenew(void *handle) {
|
||||
|
@ -194,9 +227,9 @@ int walWrite(void *handle, SWalHead *pHead) {
|
|||
void walFsync(void *handle) {
|
||||
|
||||
SWal *pWal = handle;
|
||||
if (pWal == NULL) return;
|
||||
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
||||
|
||||
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
|
||||
if (pWal->fsyncPeriod == 0) {
|
||||
if (fsync(pWal->fd) < 0) {
|
||||
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
|
||||
}
|
||||
|
@ -303,6 +336,20 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void walRelease(SWal *pWal) {
|
||||
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
pWal->signature = NULL;
|
||||
free(pWal);
|
||||
|
||||
if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
|
||||
if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
|
||||
walTmrCtrl = NULL;
|
||||
walModuleInit = PTHREAD_ONCE_INIT;
|
||||
wDebug("WAL module is cleaned up");
|
||||
}
|
||||
}
|
||||
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||
char *name = pWal->name;
|
||||
|
||||
|
@ -433,3 +480,15 @@ static int walRemoveWalFiles(const char *path) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
static void walProcessFsyncTimer(void *param, void *tmrId) {
|
||||
SWal *pWal = param;
|
||||
|
||||
if (pWal->signature != pWal) return;
|
||||
if (pWal->fd < 0) return;
|
||||
|
||||
if (fsync(pWal->fd) < 0) {
|
||||
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
|
||||
}
|
||||
|
||||
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ print =============== step2 - no db
|
|||
#11
|
||||
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:6020/rest/sql
|
||||
print 11-> $system_content
|
||||
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","comp","precision","status"],"data":[],"rows":0}@ then
|
||||
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","precision","status"],"data":[],"rows":0}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -111,29 +111,29 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
|
|||
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
||||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||
echo "debugFlag 131" >> $TAOS_CFG
|
||||
echo "mDebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "dDebugFlag 135" >> $TAOS_CFG
|
||||
echo "vDebugFlag 135" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "mDebugFlag 131" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 131" >> $TAOS_CFG
|
||||
echo "dDebugFlag 131" >> $TAOS_CFG
|
||||
echo "vDebugFlag 131" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 131" >> $TAOS_CFG
|
||||
echo "cDebugFlag 135" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 135" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 135" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 135" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 131" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 131" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 143" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 131" >> $TAOS_CFG
|
||||
echo "mqttDebugFlag 131" >> $TAOS_CFG
|
||||
echo "qdebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 135" >> $TAOS_CFG
|
||||
echo "qdebugFlag 131" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 131" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "udebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdebugFlag 135" >> $TAOS_CFG
|
||||
echo "wdebugFlag 135" >> $TAOS_CFG
|
||||
echo "udebugFlag 131" >> $TAOS_CFG
|
||||
echo "sdebugFlag 131" >> $TAOS_CFG
|
||||
echo "wdebugFlag 131" >> $TAOS_CFG
|
||||
echo "monitor 0" >> $TAOS_CFG
|
||||
echo "monitorInterval 1" >> $TAOS_CFG
|
||||
echo "http 0" >> $TAOS_CFG
|
||||
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
|
||||
echo "defaultPass taosdata" >> $TAOS_CFG
|
||||
echo "numOfLogLines 10000000" >> $TAOS_CFG
|
||||
echo "numOfLogLines 20000000" >> $TAOS_CFG
|
||||
echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG
|
||||
echo "clog 2" >> $TAOS_CFG
|
||||
echo "statusInterval 1" >> $TAOS_CFG
|
||||
|
@ -145,6 +145,6 @@ echo "tableIncStepPerVnode 10000" >> $TAOS_CFG
|
|||
echo "asyncLog 0" >> $TAOS_CFG
|
||||
echo "numOfMnodes 1" >> $TAOS_CFG
|
||||
echo "locale en_US.UTF-8" >> $TAOS_CFG
|
||||
echo "anyIp 0" >> $TAOS_CFG
|
||||
echo "fsync 0" >> $TAOS_CFG
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue