merge 3.0
This commit is contained in:
commit
95b14ee39b
|
@ -316,7 +316,7 @@ def pre_test_build_win() {
|
|||
python -m pip uninstall taospy -y
|
||||
python -m pip install taospy==2.7.12
|
||||
python -m pip uninstall taos-ws-py -y
|
||||
python -m pip install taos-ws-py==0.2.9
|
||||
python -m pip install taos-ws-py==0.3.1
|
||||
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
|
||||
'''
|
||||
return 1
|
||||
|
|
|
@ -11,7 +11,7 @@ ExternalProject_Add(curl2
|
|||
BUILD_IN_SOURCE TRUE
|
||||
BUILD_ALWAYS 1
|
||||
UPDATE_COMMAND ""
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 #--enable-debug
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 #--enable-debug
|
||||
BUILD_COMMAND make -j
|
||||
INSTALL_COMMAND make install
|
||||
TEST_COMMAND ""
|
||||
|
|
|
@ -8,7 +8,7 @@ ExternalProject_Add(openssl
|
|||
BUILD_IN_SOURCE TRUE
|
||||
#BUILD_ALWAYS 1
|
||||
#UPDATE_COMMAND ""
|
||||
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 -static #--no-shared
|
||||
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
|
||||
BUILD_COMMAND make -j
|
||||
INSTALL_COMMAND make install_sw -j
|
||||
TEST_COMMAND ""
|
||||
|
|
|
@ -44,10 +44,6 @@ extern int32_t tsNumOfSupportVnodes;
|
|||
extern int32_t tsMaxShellConns;
|
||||
extern int32_t tsShellActivityTimer;
|
||||
extern int32_t tsCompressMsgSize;
|
||||
extern int32_t tsCompressColData;
|
||||
extern int32_t tsMaxNumOfDistinctResults;
|
||||
extern int32_t tsCompatibleModel;
|
||||
extern bool tsPrintAuth;
|
||||
extern int64_t tsTickPerMin[3];
|
||||
extern int64_t tsTickPerHour[3];
|
||||
extern int32_t tsCountAlwaysReturnValue;
|
||||
|
@ -109,8 +105,6 @@ extern bool tsMonitorComp;
|
|||
|
||||
// audit
|
||||
extern bool tsEnableAudit;
|
||||
extern char tsAuditFqdn[];
|
||||
extern uint16_t tsAuditPort;
|
||||
extern bool tsEnableAuditCreateTable;
|
||||
|
||||
// telem
|
||||
|
|
|
@ -57,11 +57,28 @@ typedef enum {
|
|||
CFG_SCOPE_BOTH
|
||||
} ECfgScopeType;
|
||||
|
||||
typedef enum {
|
||||
CFG_DYN_NONE = 0,
|
||||
CFG_DYN_SERVER = 1,
|
||||
CFG_DYN_CLIENT = 2,
|
||||
CFG_DYN_BOTH = 3,
|
||||
#ifdef TD_ENTERPRISE
|
||||
CFG_DYN_ENT_SERVER = CFG_DYN_SERVER,
|
||||
CFG_DYN_ENT_CLIENT = CFG_DYN_CLIENT,
|
||||
CFG_DYN_ENT_BOTH = CFG_DYN_BOTH,
|
||||
#else
|
||||
CFG_DYN_ENT_SERVER = CFG_DYN_NONE,
|
||||
CFG_DYN_ENT_CLIENT = CFG_DYN_NONE,
|
||||
CFG_DYN_ENT_BOTH = CFG_DYN_NONE,
|
||||
#endif
|
||||
} ECfgDynType;
|
||||
|
||||
typedef struct SConfigItem {
|
||||
ECfgSrcType stype;
|
||||
ECfgDataType dtype;
|
||||
int8_t scope;
|
||||
char *name;
|
||||
int8_t dynScope;
|
||||
char *name;
|
||||
union {
|
||||
bool bval;
|
||||
float fval;
|
||||
|
@ -99,15 +116,20 @@ int32_t cfgGetSize(SConfig *pCfg);
|
|||
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
|
||||
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope);
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope);
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
|
||||
const char *cfgStypeStr(ECfgSrcType type);
|
||||
const char *cfgDtypeStr(ECfgDataType type);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -79,7 +79,7 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
|
|||
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
|
||||
|
||||
#ifdef _GRANT
|
||||
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
|
||||
|
@ -1182,15 +1182,72 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
const char *options[] = {
|
||||
"debugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
|
||||
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag",
|
||||
"uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag", "stDebugFlag",
|
||||
};
|
||||
int32_t optionSize = tListLen(options);
|
||||
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
|
||||
terrno = 0;
|
||||
char *p = pMCfgReq->config;
|
||||
while (*p) {
|
||||
if (*p == ' ') {
|
||||
break;
|
||||
}
|
||||
p++;
|
||||
}
|
||||
|
||||
size_t optLen = p - pMCfgReq->config;
|
||||
strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
|
||||
pDCfgReq->config[optLen] = 0;
|
||||
|
||||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
// 'key value'
|
||||
if (strlen(pMCfgReq->value) != 0) goto _err;
|
||||
strcpy(pDCfgReq->value, p + 1);
|
||||
} else {
|
||||
// 'key' 'value'
|
||||
if (strlen(pMCfgReq->value) == 0) goto _err;
|
||||
strcpy(pDCfgReq->value, pMCfgReq->value);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
|
||||
SEpSet epSet = mndGetDnodeEpset(pDnode);
|
||||
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
|
||||
if (pBuf != NULL) {
|
||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq);
|
||||
mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
||||
tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
if (code == -1) {
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SMCfgDnodeReq cfgReq = {0};
|
||||
if (tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -1206,139 +1263,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
SDCfgDnodeReq dcfgReq = {0};
|
||||
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
||||
strcpy(dcfgReq.config, "resetlog");
|
||||
} else if (strncasecmp(cfgReq.config, "monitor", 7) == 0) {
|
||||
if (' ' != cfgReq.config[7] && 0 != cfgReq.config[7]) {
|
||||
mError("dnode:%d, failed to config monitor since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *value = cfgReq.value;
|
||||
int32_t flag = atoi(value);
|
||||
if (flag <= 0) {
|
||||
flag = atoi(cfgReq.config + 8);
|
||||
}
|
||||
if (flag < 0 || flag > 2) {
|
||||
mError("dnode:%d, failed to config monitor since value:%d", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "monitor");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||
int32_t optLen = strlen("s3blocksize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag > 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3blocksize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) {
|
||||
int32_t optLen = strlen("s3blockcachesize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 4 || flag > 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]",
|
||||
cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3blockcachesize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3pagecachesize", 16) == 0) {
|
||||
int32_t optLen = strlen("s3pagecachesize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 4 || flag > 1024 * 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3PageCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3pagecachesize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) {
|
||||
int32_t optLen = strlen("s3uploaddelaysec");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 600 || flag > 60 * 60 * 24 * 30) {
|
||||
mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]",
|
||||
cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3uploaddelaysec");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
||||
int32_t optLen = strlen("ttlpushinterval");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0 || flag > 100000) {
|
||||
mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "ttlpushinterval");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "ttlbatchdropnum", 15) == 0) {
|
||||
int32_t optLen = strlen("ttlbatchdropnum");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0) {
|
||||
mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId, flag,
|
||||
INT32_MAX);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "ttlbatchdropnum");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "asynclog", 8) == 0) {
|
||||
int32_t optLen = strlen("asynclog");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0 || flag > 1) {
|
||||
mError("dnode:%d, failed to config asynclog since value:%d. Valid range: [0, 1]", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "asynclog");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
#ifdef TD_ENTERPRISE
|
||||
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
|
||||
int32_t optLen = strlen("supportvnodes");
|
||||
|
@ -1348,9 +1272,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
if (flag < 0 || flag > 4096) {
|
||||
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _err_out;
|
||||
}
|
||||
if (flag == 0) {
|
||||
flag = tsNumOfCores * 2;
|
||||
|
@ -1365,8 +1288,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
|
||||
mError("dnode:%d, failed to config activeCode since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
goto _err_out;
|
||||
}
|
||||
int32_t vlen = strlen(cfgReq.value);
|
||||
if (vlen > 0 && ((opt == DND_ACTIVE_CODE && vlen != (TSDB_ACTIVE_KEY_LEN - 1)) ||
|
||||
|
@ -1374,9 +1296,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
(vlen > (TSDB_CONN_ACTIVE_KEY_LEN - 1) || vlen < (TSDB_ACTIVE_KEY_LEN - 1))))) {
|
||||
mError("dnode:%d, failed to config activeCode since invalid vlen:%d. conf:%s, val:%s", cfgReq.dnodeId, vlen,
|
||||
cfgReq.config, cfgReq.value);
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
goto _err_out;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode");
|
||||
|
@ -1384,88 +1305,37 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
|
||||
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
goto _err_out;
|
||||
}
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return 0;
|
||||
#endif
|
||||
} else {
|
||||
bool findOpt = false;
|
||||
for (int32_t d = 0; d < optionSize; ++d) {
|
||||
const char *optName = options[d];
|
||||
int32_t optLen = strlen(optName);
|
||||
if (strncasecmp(cfgReq.config, optName, optLen) != 0) continue;
|
||||
|
||||
if (' ' != cfgReq.config[optLen] && 0 != cfgReq.config[optLen]) {
|
||||
mError("dnode:%d, failed to config since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *value = cfgReq.value;
|
||||
int32_t flag = atoi(value);
|
||||
if (flag <= 0) {
|
||||
flag = atoi(cfgReq.config + optLen + 1);
|
||||
}
|
||||
if (flag < 0 || flag > 255) {
|
||||
mError("dnode:%d, failed to config %s since value:%d", cfgReq.dnodeId, optName, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tstrncpy(dcfgReq.config, optName, optLen + 1);
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
findOpt = true;
|
||||
}
|
||||
|
||||
if (!findOpt) {
|
||||
mndMCfg2DCfg(&cfgReq, &dcfgReq);
|
||||
if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
|
||||
mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
mError("dnode:%d, failed to config since %s", cfgReq.dnodeId, terrstr());
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
goto _err_out;
|
||||
}
|
||||
|
||||
if (cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true) != 0) goto _err_out;
|
||||
}
|
||||
|
||||
char obj[50] = {0};
|
||||
sprintf(obj, "%d", cfgReq.dnodeId);
|
||||
{ // audit
|
||||
char obj[50] = {0};
|
||||
sprintf(obj, "%d", cfgReq.dnodeId);
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "alterDnode", "", obj, cfgReq.sql, cfgReq.sqlLen);
|
||||
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
|
||||
}
|
||||
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
return mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
|
||||
|
||||
if (pDnode->id == cfgReq.dnodeId || cfgReq.dnodeId == -1 || cfgReq.dnodeId == 0) {
|
||||
SEpSet epSet = mndGetDnodeEpset(pDnode);
|
||||
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
|
||||
if (pBuf != NULL) {
|
||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
||||
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
||||
dcfgReq.config, dcfgReq.value);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
||||
tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
if (code == -1) {
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
return code;
|
||||
_err_out:
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
|
||||
|
@ -1606,16 +1476,16 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
|
|||
}
|
||||
|
||||
// get int32_t value from 'SMCfgDnodeReq'
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_t *pOutValue) {
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
|
||||
terrno = 0;
|
||||
if (' ' != pMCfgReq->config[opLen] && 0 != pMCfgReq->config[opLen]) {
|
||||
if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (' ' == pMCfgReq->config[opLen]) {
|
||||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
// 'key value'
|
||||
if (strlen(pMCfgReq->value) != 0) goto _err;
|
||||
*pOutValue = atoi(pMCfgReq->config + opLen + 1);
|
||||
*pOutValue = atoi(pMCfgReq->config + optLen + 1);
|
||||
} else {
|
||||
// 'key' 'value'
|
||||
if (strlen(pMCfgReq->value) == 0) goto _err;
|
||||
|
|
|
@ -174,8 +174,8 @@ if(${BUILD_WITH_S3})
|
|||
find_library(S3_LIBRARY s3)
|
||||
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
|
||||
find_library(XML2_LIBRARY xml2)
|
||||
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
||||
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
||||
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
||||
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
|
||||
target_link_libraries(
|
||||
vnode
|
||||
|
||||
|
|
|
@ -144,7 +144,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->pTsdb = p->pVnode->pTsdb;
|
||||
p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
|
||||
p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX};
|
||||
p->info.suid = suid;
|
||||
p->numOfCols = numOfCols;
|
||||
p->pCidList = pCidList;
|
||||
|
|
|
@ -112,7 +112,6 @@ extern "C" {
|
|||
#define COMMAND_CATALOG_DEBUG "catalogDebug"
|
||||
#define COMMAND_ENABLE_MEM_DEBUG "enableMemDebug"
|
||||
#define COMMAND_DISABLE_MEM_DEBUG "disableMemDebug"
|
||||
#define COMMAND_ASYNCLOG "asynclog"
|
||||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
|
|
|
@ -808,16 +808,6 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
|
|||
return code;
|
||||
}
|
||||
qInfo("memory dbg disabled");
|
||||
} else if (0 == strcasecmp(cmd, COMMAND_ASYNCLOG)) {
|
||||
int newAsyncLogValue = (strlen(value) == 0) ? 1 : atoi(value);
|
||||
if (newAsyncLogValue != 0 && newAsyncLogValue != 1) {
|
||||
code = TSDB_CODE_INVALID_CFG_VALUE;
|
||||
qError("failed to alter asynclog, error:%s", tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
tsAsyncLog = newAsyncLogValue;
|
||||
} else {
|
||||
goto _return;
|
||||
}
|
||||
|
@ -844,10 +834,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
bool forbidden = false;
|
||||
taosLocalCfgForbiddenToChange(pStmt->config, &forbidden);
|
||||
if (forbidden) {
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
if (cfgCheckRangeForDynUpdate(tsCfg, pStmt->config, pStmt->value, false)) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool clearState;
|
||||
bool recvGetAll;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
SArray* historyWins;
|
||||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool recvGetAll;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
|
|
|
@ -373,8 +373,11 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
|||
if (pGroupResInfo->freeItem) {
|
||||
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
|
||||
for (int32_t i = pGroupResInfo->index; i < size; i++) {
|
||||
void* pVal = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
taosMemoryFree(pVal);
|
||||
SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
if (!pPos->needFree && !pPos->pRowBuff) {
|
||||
taosMemoryFreeClear(pPos->pKey);
|
||||
taosMemoryFree(pPos);
|
||||
}
|
||||
}
|
||||
pGroupResInfo->freeItem = false;
|
||||
}
|
||||
|
@ -2536,6 +2539,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
taosMemoryFree(buf);
|
||||
}
|
||||
|
||||
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
|
||||
SResultWindowInfo* pResInfo = pIte;
|
||||
pResInfo->pStatePos->beUsed = true;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -2550,6 +2562,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
if (opRes) {
|
||||
return opRes;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2587,6 +2605,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -2843,6 +2862,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->clearState = false;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
@ -3460,6 +3481,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -3488,6 +3514,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -3718,6 +3745,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
}
|
||||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
|
|
@ -1989,7 +1989,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
}
|
||||
if (para2Bytes <= 0 || para2Bytes > 4096) { // cast dst var type length limits to 4096 bytes
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"CAST function converted length should be in range [0, 4096] bytes");
|
||||
"CAST function converted length should be in range (0, 4096] bytes");
|
||||
}
|
||||
|
||||
// add database precision as param
|
||||
|
@ -3312,26 +3312,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = castFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_iso8601",
|
||||
.type = FUNCTION_TYPE_TO_ISO8601,
|
||||
|
@ -3709,6 +3689,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = qVgIdFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -285,7 +285,7 @@ static SNode* createConstantValue() {
|
|||
static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelect, bool subquery) {
|
||||
SNode* pProj = NULL;
|
||||
WHERE_EACH(pProj, pSelect->pProjectionList) {
|
||||
if (subquery && !pSelect->isDistinct && isUselessCol((SExprNode*)pProj)) {
|
||||
if (subquery && !pSelect->isDistinct && !pSelect->tagScan && isUselessCol((SExprNode*)pProj)) {
|
||||
ERASE_NODE(pSelect->pProjectionList);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -166,6 +166,7 @@ static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) {
|
|||
}
|
||||
|
||||
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pDuplicate = false;
|
||||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -173,13 +174,13 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
|
|||
STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
|
||||
if (NULL != pMeta) {
|
||||
*pDuplicate = true;
|
||||
int32_t code = ignoreUsingClause(pCxt, &pStmt->pSql);
|
||||
code = ignoreUsingClause(pCxt, &pStmt->pSql);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
return cloneTableMeta(*pMeta, &pStmt->pTableMeta);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
|
@ -1594,11 +1595,9 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
|
|||
|
||||
if (pToken->n > 0) {
|
||||
if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
|
||||
memcpy(ctbName, pToken->z, pToken->n);
|
||||
ctbName[pToken->n] = '\0';
|
||||
memcpy(pStbRowsCxt->ctbName.tname, pToken->z, pToken->n);
|
||||
pStbRowsCxt->ctbName.tname[pToken->n] = '\0';
|
||||
*pFoundCtbName = true;
|
||||
tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname));
|
||||
tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, pToken->n);
|
||||
} else {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "tbname is too long");
|
||||
}
|
||||
|
@ -1644,10 +1643,13 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
SArray* pTagNames = pStbRowsCxt->aTagNames;
|
||||
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
||||
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
|
||||
int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
|
||||
int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
|
||||
for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
|
||||
const char* pTmpSql = *ppSql;
|
||||
bool ignoreComma = false;
|
||||
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
|
||||
|
||||
if (ignoreComma) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
|
||||
break;
|
||||
|
@ -1657,12 +1659,11 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) {
|
||||
if (pCols->pColIndex[i] < numOfCols) {
|
||||
const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
|
||||
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
|
||||
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal);
|
||||
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||
} else if (pCols->pColIndex[i] < tbnameIdx) {
|
||||
const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
|
||||
if (canParseTagsAfter) {
|
||||
tagTokens[(*pNumOfTagTokens)] = *pToken;
|
||||
|
@ -1675,7 +1676,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
}
|
||||
}
|
||||
}
|
||||
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||
else if (pCols->pColIndex[i] == tbnameIdx) {
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName);
|
||||
}
|
||||
|
@ -1756,7 +1757,7 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
|||
.requestId = pCxt->pComCxt->requestId,
|
||||
.requestObjRefId = pCxt->pComCxt->requestRid,
|
||||
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
|
||||
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
|
||||
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
|
||||
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
|
||||
|
@ -1771,12 +1772,6 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
|||
return code;
|
||||
}
|
||||
|
||||
static void resetStbRowsDataContextPreStbRow(SStbRowsDataContext* pStbRowsCxt) {
|
||||
pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
|
||||
pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
|
||||
|
||||
insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
|
||||
}
|
||||
|
||||
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
||||
if (pStbRowsCxt == NULL) return;
|
||||
|
@ -1791,19 +1786,15 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
|||
taosArrayClear(pStbRowsCxt->aTagVals);
|
||||
|
||||
clearColValArray(pStbRowsCxt->aColVals);
|
||||
taosArrayClear(pStbRowsCxt->aColVals);
|
||||
|
||||
tTagFree(pStbRowsCxt->pTag);
|
||||
pStbRowsCxt->pTag = NULL;
|
||||
pStbRowsCxt->pCtbMeta->uid = 0;
|
||||
pStbRowsCxt->pCtbMeta->vgId = 0;
|
||||
tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
|
||||
taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
|
||||
}
|
||||
|
||||
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
||||
resetStbRowsDataContextPreStbRow(pStbRowsCxt);
|
||||
bool bFirstTable = false;
|
||||
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
|
||||
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
|
||||
|
@ -1825,6 +1816,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
|||
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
|
||||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
*pGotRow = true;
|
||||
}
|
||||
|
@ -2100,6 +2092,10 @@ static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDa
|
|||
collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
|
||||
collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
|
||||
|
||||
pStbRowsCxt->ctbName.type = TSDB_TABLE_NAME_T;
|
||||
pStbRowsCxt->ctbName.acctId = pStbRowsCxt->stbName.acctId;
|
||||
memcpy(pStbRowsCxt->ctbName.dbname, pStbRowsCxt->stbName.dbname, sizeof(pStbRowsCxt->stbName.dbname));
|
||||
|
||||
pStbRowsCxt->pTagCond = pStmt->pTagCond;
|
||||
pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
|
||||
|
||||
|
|
|
@ -434,27 +434,31 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
|||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput);
|
||||
}
|
||||
|
||||
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (rsp) {
|
||||
if (NULL != rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#include "syncReplication.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
|
||||
|
||||
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
||||
|
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
}
|
||||
|
||||
int dataLen = 0;
|
||||
if (snapInfo.data) {
|
||||
SSyncTLV *datHead = snapInfo.data;
|
||||
void *pData = snapInfo.data;
|
||||
int32_t type = 0;
|
||||
if (pData) {
|
||||
type = snapInfo.type;
|
||||
SSyncTLV *datHead = pData;
|
||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
|
||||
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
|
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
if (dataLen > 0) {
|
||||
pMsg->payloadType = snapInfo.type;
|
||||
memcpy(pMsg->data, snapInfo.data, dataLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
|
||||
code = 0;
|
||||
_out:
|
||||
if (snapInfo.data) {
|
||||
|
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
|||
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
||||
}
|
||||
|
||||
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
|
||||
int32_t code = -1;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = seq;
|
||||
|
||||
if (pBlock != NULL && blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlock, blockLen);
|
||||
}
|
||||
pMsg->payloadType = typ;
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
_OUT:
|
||||
return code;
|
||||
}
|
||||
|
||||
// when sender receive ack, call this function to send msg from seq
|
||||
// seq = ack + 1, already updated
|
||||
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||
|
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
|
||||
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
|
||||
|
||||
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
||||
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
|
||||
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
|
@ -336,36 +330,17 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
|
||||
continue;
|
||||
}
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pBlk->seq;
|
||||
|
||||
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
|
||||
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
pBlk->sendTimeMs = nowMs;
|
||||
}
|
||||
|
||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
code = 0;
|
||||
_out:;
|
||||
taosThreadMutexUnlock(&pSndBuf->mutex);
|
||||
|
@ -861,7 +836,7 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
|
|||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pMsg->startTime;
|
||||
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
||||
pRspMsg->ack = pMsg->seq;
|
||||
pRspMsg->code = code;
|
||||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||
|
||||
|
@ -893,13 +868,13 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
|
|||
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
||||
ppMsg[0] = NULL;
|
||||
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
||||
} else {
|
||||
} else if (pMsg->seq < pRcvBuf->start) {
|
||||
syncSnapSendRsp(pReceiver, pMsg, code);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
|
||||
if (pRcvBuf->entries[seq]) {
|
||||
if (pRcvBuf->entries[seq % pRcvBuf->size]) {
|
||||
pRcvBuf->cursor = seq;
|
||||
} else {
|
||||
break;
|
||||
|
|
|
@ -310,6 +310,7 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
|||
GRANT_CFG_SET;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
if (pItem == NULL) {
|
||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -338,6 +339,7 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
|||
break;
|
||||
}
|
||||
|
||||
_err_out:
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
@ -357,6 +359,50 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
|
||||
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
if (!pItem || (pItem->dynScope & dynType) == 0) {
|
||||
uError("failed to config:%s, not support", name);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_INT32: {
|
||||
int32_t ival = (int32_t)atoi(pVal);
|
||||
if (ival < pItem->imin || ival > pItem->imax) {
|
||||
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
case CFG_DTYPE_INT64: {
|
||||
int64_t ival = (int64_t)atoll(pVal);
|
||||
if (ival < pItem->imin || ival > pItem->imax) {
|
||||
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
case CFG_DTYPE_FLOAT:
|
||||
case CFG_DTYPE_DOUBLE: {
|
||||
float fval = (float)atof(pVal);
|
||||
if (fval < pItem->fmin || fval > pItem->fmax) {
|
||||
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval,
|
||||
pItem->fmin, pItem->fmax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
||||
pItem->stype = CFG_STYPE_DEFAULT;
|
||||
pItem->name = taosStrdup(name);
|
||||
|
@ -381,43 +427,61 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .scope = scope};
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .scope = scope, .dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT32, .i32 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT32,
|
||||
.i32 = defaultVal,
|
||||
.imin = minval,
|
||||
.imax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT64, .i64 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT64,
|
||||
.i64 = defaultVal,
|
||||
.imin = minval,
|
||||
.imax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope) {
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT, .fval = defaultVal, .fmin = minval, .fmax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT,
|
||||
.fval = defaultVal,
|
||||
.fmin = minval,
|
||||
.fmax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope};
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope, .dynScope = dynScope};
|
||||
item.str = taosStrdup(defaultVal);
|
||||
if (item.str == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -426,8 +490,8 @@ int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, in
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .scope = scope};
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetDir(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -435,8 +499,8 @@ int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope};
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetLocale(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -444,8 +508,8 @@ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, in
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope};
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetCharset(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -453,8 +517,8 @@ int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, i
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope};
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetTimezone(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -57,7 +57,6 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t fileNum;
|
||||
int32_t maxLines;
|
||||
int32_t lines;
|
||||
int32_t flag;
|
||||
int32_t openInProgress;
|
||||
|
@ -122,7 +121,7 @@ static void *taosAsyncOutputLog(void *param);
|
|||
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
|
||||
static SLogBuff *taosLogBuffNew(int32_t bufSize);
|
||||
static void taosCloseLogByFd(TdFilePtr pFile);
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum);
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxFileNum);
|
||||
|
||||
static FORCE_INLINE void taosUpdateDaylight() {
|
||||
struct tm Tm, *ptm;
|
||||
|
@ -186,7 +185,7 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) {
|
|||
|
||||
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
|
||||
if (tsLogObj.logHandle == NULL) return -1;
|
||||
if (taosOpenLogFile(fullName, tsNumOfLogLines, maxFiles) < 0) return -1;
|
||||
if (taosOpenLogFile(fullName, maxFiles) < 0) return -1;
|
||||
|
||||
if (taosInitSlowLog() < 0) return -1;
|
||||
if (taosStartLog() < 0) return -1;
|
||||
|
@ -283,7 +282,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
|||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
tsLogObj.openInProgress = 0;
|
||||
tsLogObj.lines = tsLogObj.maxLines - 1000;
|
||||
tsLogObj.lines = tsNumOfLogLines - 1000;
|
||||
uError("open new log file fail! reason:%s, reuse lastlog", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
@ -308,7 +307,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
|||
static int32_t taosOpenNewLogFile() {
|
||||
taosThreadMutexLock(&tsLogObj.logMutex);
|
||||
|
||||
if (tsLogObj.lines > tsLogObj.maxLines && tsLogObj.openInProgress == 0) {
|
||||
if (tsLogObj.lines > tsNumOfLogLines && tsLogObj.openInProgress == 0) {
|
||||
tsLogObj.openInProgress = 1;
|
||||
|
||||
uInfo("open new log file ......");
|
||||
|
@ -331,7 +330,7 @@ void taosResetLog() {
|
|||
sprintf(lastName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
|
||||
|
||||
// force create a new log file
|
||||
tsLogObj.lines = tsLogObj.maxLines + 10;
|
||||
tsLogObj.lines = tsNumOfLogLines + 10;
|
||||
|
||||
taosOpenNewLogFile();
|
||||
(void)taosRemoveFile(lastName);
|
||||
|
@ -384,7 +383,7 @@ static void taosGetLogFileName(char *fn) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxFileNum) {
|
||||
#ifdef WINDOWS_STASH
|
||||
/*
|
||||
* always set maxFileNum to 1
|
||||
|
@ -396,7 +395,6 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
|||
char name[LOG_FILE_NAME_LEN + 50] = "\0";
|
||||
int32_t logstat0_mtime, logstat1_mtime;
|
||||
|
||||
tsLogObj.maxLines = maxLines;
|
||||
tsLogObj.fileNum = maxFileNum;
|
||||
taosGetLogFileName(fn);
|
||||
|
||||
|
@ -497,9 +495,9 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
|
|||
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
|
||||
}
|
||||
|
||||
if (tsLogObj.maxLines > 0) {
|
||||
if (tsNumOfLogLines > 0) {
|
||||
atomic_add_fetch_32(&tsLogObj.lines, 1);
|
||||
if ((tsLogObj.lines > tsLogObj.maxLines) && (tsLogObj.openInProgress == 0)) {
|
||||
if ((tsLogObj.lines > tsNumOfLogLines) && (tsLogObj.openInProgress == 0)) {
|
||||
taosOpenNewLogFile();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,12 +54,12 @@ TEST_F(CfgTest, 02_Basic) {
|
|||
SConfig *pConfig = cfgInit();
|
||||
ASSERT_NE(pConfig, nullptr);
|
||||
|
||||
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0), 0);
|
||||
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0), 0);
|
||||
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 0), 0);
|
||||
|
||||
EXPECT_EQ(cfgGetSize(pConfig), 6);
|
||||
|
||||
|
|
|
@ -81,10 +81,10 @@ pip3 list|grep taospy
|
|||
pip3 uninstall taospy -y
|
||||
pip3 install --default-timeout=120 taospy==2.7.12
|
||||
|
||||
#define taos-ws-py 0.2.8
|
||||
#define taos-ws-py 0.3.1
|
||||
pip3 list|grep taos-ws-py
|
||||
pip3 uninstall taos-ws-py -y
|
||||
pip3 install --default-timeout=120 taos-ws-py==0.2.8
|
||||
pip3 install --default-timeout=600 taos-ws-py==0.3.1
|
||||
|
||||
$TIMEOUT_CMD $cmd
|
||||
RET=$?
|
||||
|
|
|
@ -21,9 +21,16 @@
|
|||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/time.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "taos.h" // TAOS header file
|
||||
|
||||
static int64_t currTimeInUs() {
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
return (start_time.tv_sec) * 1000000 + (start_time.tv_usec);
|
||||
}
|
||||
|
||||
static void executeSql(TAOS *taos, char *command) {
|
||||
int i;
|
||||
TAOS_RES *pSql = NULL;
|
||||
|
@ -52,7 +59,7 @@ static void executeSql(TAOS *taos, char *command) {
|
|||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
void testInsert(TAOS *taos, char *qstr) {
|
||||
void testInsert(TAOS *taos, char *qstr, double* pElapsedTime) {
|
||||
executeSql(taos, "drop database if exists demo2");
|
||||
executeSql(taos, "create database demo2");
|
||||
executeSql(taos, "use demo2");
|
||||
|
@ -60,8 +67,7 @@ void testInsert(TAOS *taos, char *qstr) {
|
|||
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))");
|
||||
printf("success to create table\n");
|
||||
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
int64_t ts1 = currTimeInUs();
|
||||
|
||||
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||
int len = 0;
|
||||
|
@ -84,15 +90,16 @@ void testInsert(TAOS *taos, char *qstr) {
|
|||
taos_free_result(result1);
|
||||
}
|
||||
}
|
||||
struct timeval end_time;
|
||||
gettimeofday(&end_time, NULL);
|
||||
double elapsed_time = (double)(end_time.tv_sec - start_time.tv_sec) +
|
||||
(double)(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||
printf("elapsed time: %.3f\n", elapsed_time);
|
||||
|
||||
int64_t ts2 = currTimeInUs();
|
||||
double elapsedTime = (double)(ts2-ts1) / 1000000.0;
|
||||
*pElapsedTime = elapsedTime;
|
||||
|
||||
printf("elapsed time: %.3f\n", elapsedTime);
|
||||
executeSql(taos, "drop database if exists demo2");
|
||||
}
|
||||
|
||||
void testInsertStb(TAOS *taos, char *qstr) {
|
||||
void testInsertStb(TAOS *taos, char *qstr, double *pElapsedTime) {
|
||||
executeSql(taos, "drop database if exists demo");
|
||||
executeSql(taos, "create database demo");
|
||||
executeSql(taos, "use demo");
|
||||
|
@ -100,8 +107,7 @@ void testInsertStb(TAOS *taos, char *qstr) {
|
|||
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))");
|
||||
printf("success to create table\n");
|
||||
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
int64_t ts1 = currTimeInUs();
|
||||
|
||||
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||
int len = 0;
|
||||
|
@ -125,12 +131,11 @@ void testInsertStb(TAOS *taos, char *qstr) {
|
|||
taos_free_result(result1);
|
||||
}
|
||||
}
|
||||
struct timeval end_time;
|
||||
gettimeofday(&end_time, NULL);
|
||||
double elapsed_time = (double)(end_time.tv_sec - start_time.tv_sec) +
|
||||
(double)(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||
|
||||
printf("elapsed time: %.3f\n", elapsed_time);
|
||||
int64_t ts2 = currTimeInUs();
|
||||
double elapsedTime = (double)(ts2 - ts1) / 1000000.0;
|
||||
*pElapsedTime = elapsedTime;
|
||||
printf("elapsed time: %.3f\n", elapsedTime);
|
||||
executeSql(taos, "drop database if exists demo");
|
||||
}
|
||||
|
||||
|
@ -149,13 +154,26 @@ int main(int argc, char *argv[]) {
|
|||
exit(1);
|
||||
}
|
||||
char* qstr = malloc(1024*1024);
|
||||
printf("test insert into tb using stb\n\n");
|
||||
for (int i =0; i < 5; ++i) {
|
||||
testInsert(taos, qstr);
|
||||
{
|
||||
printf("test insert into tb using stb\n\n");
|
||||
double sum = 0;
|
||||
for (int i =0; i < 5; ++i) {
|
||||
double elapsed = 0;
|
||||
testInsert(taos, qstr, &elapsed);
|
||||
sum += elapsed;
|
||||
}
|
||||
printf("average insert tb using stb time : %.3f\n", sum/5);
|
||||
}
|
||||
printf("test insert into stb tbname\n\n");
|
||||
for (int i =0; i < 5; ++i) {
|
||||
testInsertStb(taos, qstr);
|
||||
{
|
||||
printf("test insert into stb\n\n");
|
||||
double sum = 0;
|
||||
for (int i =0; i < 5; ++i) {
|
||||
double elapsed = 0;
|
||||
testInsertStb(taos, qstr, &elapsed);
|
||||
sum += elapsed;
|
||||
}
|
||||
printf("average insert into stb time : %.3f\n", sum/5);
|
||||
}
|
||||
free(qstr);
|
||||
taos_close(taos);
|
||||
|
|
|
@ -319,4 +319,7 @@ if $data45 != 30.000000000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql_error insert into ct1 using stb tags('a', 'b') values ('2022-06-26 13:00:00', 1) ct11 using sta tags('c', 'b#) values ('2022-06-26 13:00:01', 2);
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -51,6 +51,21 @@ print $rows
|
|||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create database dgxy;
|
||||
sql use dgxy;
|
||||
sql create table st(ts timestamp, f int) tags(t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1);
|
||||
sql insert into st(tbname, ts, f) values('ct1', now, 2);
|
||||
sql select * from ct1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
sql show tables like 'ct1';
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error insert into d2.st values(now, 1, 1)
|
||||
sql_error insert into d2.st(ts, f) values(now, 1);
|
||||
sql_error insert into d2.st(ts, f, tbname) values(now, 1);
|
||||
|
|
|
@ -140,4 +140,28 @@ if $data01 != @1aa@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create table stb34 (ts timestamp, f int) tags(t int);
|
||||
sql insert into ctb34 using stb34 tags(1) values(now, 1)(now+1s, 2);
|
||||
sql select 1 from (select tags t from stb34 order by t)
|
||||
print $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select tags t from stb34)
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select 1 from (select tags ts from stb34)
|
||||
print $rows
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select tags ts from stb34)
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -176,7 +176,7 @@ class TDTestCase:
|
|||
tdSql.checkData(3, 2, 10)
|
||||
tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 8, 0, 13))
|
||||
tdSql.checkData(4, 1, 1)
|
||||
tdSql.checkData(4, 2, None)
|
||||
tdSql.checkData(4, 2, 1)
|
||||
|
||||
def run_insert_stb(self):
|
||||
print("running {}".format('insert_stb'))
|
||||
|
|
Loading…
Reference in New Issue