fix:merge from 3.0
This commit is contained in:
commit
c0068abc9a
|
@ -316,7 +316,7 @@ def pre_test_build_win() {
|
|||
python -m pip uninstall taospy -y
|
||||
python -m pip install taospy==2.7.12
|
||||
python -m pip uninstall taos-ws-py -y
|
||||
python -m pip install taos-ws-py==0.2.9
|
||||
python -m pip install taos-ws-py==0.3.1
|
||||
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
|
||||
'''
|
||||
return 1
|
||||
|
|
|
@ -11,7 +11,7 @@ ExternalProject_Add(curl2
|
|||
BUILD_IN_SOURCE TRUE
|
||||
BUILD_ALWAYS 1
|
||||
UPDATE_COMMAND ""
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 #--enable-debug
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 #--enable-debug
|
||||
BUILD_COMMAND make -j
|
||||
INSTALL_COMMAND make install
|
||||
TEST_COMMAND ""
|
||||
|
|
|
@ -8,7 +8,7 @@ ExternalProject_Add(openssl
|
|||
BUILD_IN_SOURCE TRUE
|
||||
#BUILD_ALWAYS 1
|
||||
#UPDATE_COMMAND ""
|
||||
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 -static #--no-shared
|
||||
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
|
||||
BUILD_COMMAND make -j
|
||||
INSTALL_COMMAND make install_sw -j
|
||||
TEST_COMMAND ""
|
||||
|
|
|
@ -44,10 +44,6 @@ extern int32_t tsNumOfSupportVnodes;
|
|||
extern int32_t tsMaxShellConns;
|
||||
extern int32_t tsShellActivityTimer;
|
||||
extern int32_t tsCompressMsgSize;
|
||||
extern int32_t tsCompressColData;
|
||||
extern int32_t tsMaxNumOfDistinctResults;
|
||||
extern int32_t tsCompatibleModel;
|
||||
extern bool tsPrintAuth;
|
||||
extern int64_t tsTickPerMin[3];
|
||||
extern int64_t tsTickPerHour[3];
|
||||
extern int32_t tsCountAlwaysReturnValue;
|
||||
|
@ -79,14 +75,13 @@ extern int32_t tsElectInterval;
|
|||
extern int32_t tsHeartbeatInterval;
|
||||
extern int32_t tsHeartbeatTimeout;
|
||||
|
||||
// vnode
|
||||
extern int64_t tsVndCommitMaxIntervalMs;
|
||||
|
||||
// snode
|
||||
extern char tsSnodeAddress[]; //127.0.0.1:873
|
||||
extern int32_t tsRsyncPort;
|
||||
extern char tsCheckpointBackupDir[];
|
||||
|
||||
// vnode checkpoint
|
||||
extern char tsSnodeAddress[]; //127.0.0.1:873
|
||||
|
||||
// mnode
|
||||
extern int64_t tsMndSdbWriteDelta;
|
||||
extern int64_t tsMndLogRetention;
|
||||
|
@ -110,8 +105,6 @@ extern bool tsMonitorComp;
|
|||
|
||||
// audit
|
||||
extern bool tsEnableAudit;
|
||||
extern char tsAuditFqdn[];
|
||||
extern uint16_t tsAuditPort;
|
||||
extern bool tsEnableAuditCreateTable;
|
||||
|
||||
// telem
|
||||
|
|
|
@ -57,10 +57,27 @@ typedef enum {
|
|||
CFG_SCOPE_BOTH
|
||||
} ECfgScopeType;
|
||||
|
||||
typedef enum {
|
||||
CFG_DYN_NONE = 0,
|
||||
CFG_DYN_SERVER = 1,
|
||||
CFG_DYN_CLIENT = 2,
|
||||
CFG_DYN_BOTH = 3,
|
||||
#ifdef TD_ENTERPRISE
|
||||
CFG_DYN_ENT_SERVER = CFG_DYN_SERVER,
|
||||
CFG_DYN_ENT_CLIENT = CFG_DYN_CLIENT,
|
||||
CFG_DYN_ENT_BOTH = CFG_DYN_BOTH,
|
||||
#else
|
||||
CFG_DYN_ENT_SERVER = CFG_DYN_NONE,
|
||||
CFG_DYN_ENT_CLIENT = CFG_DYN_NONE,
|
||||
CFG_DYN_ENT_BOTH = CFG_DYN_NONE,
|
||||
#endif
|
||||
} ECfgDynType;
|
||||
|
||||
typedef struct SConfigItem {
|
||||
ECfgSrcType stype;
|
||||
ECfgDataType dtype;
|
||||
int8_t scope;
|
||||
int8_t dynScope;
|
||||
char *name;
|
||||
union {
|
||||
bool bval;
|
||||
|
@ -99,15 +116,20 @@ int32_t cfgGetSize(SConfig *pCfg);
|
|||
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
|
||||
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope);
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope);
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
|
||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer);
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope,
|
||||
int8_t dynScope);
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope);
|
||||
|
||||
const char *cfgStypeStr(ECfgSrcType type);
|
||||
const char *cfgDtypeStr(ECfgDataType type);
|
||||
|
|
|
@ -1052,6 +1052,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
}
|
||||
}
|
||||
|
||||
pRequest->body.execMode = pQuery->execMode;
|
||||
switch (pQuery->execMode) {
|
||||
case QUERY_EXEC_MODE_LOCAL:
|
||||
if (!pRequest->validateOnly) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -79,7 +79,7 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
|
|||
static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
|
||||
|
||||
#ifdef _GRANT
|
||||
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
|
||||
|
@ -1182,15 +1182,72 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
|
||||
terrno = 0;
|
||||
char *p = pMCfgReq->config;
|
||||
while (*p) {
|
||||
if (*p == ' ') {
|
||||
break;
|
||||
}
|
||||
p++;
|
||||
}
|
||||
|
||||
size_t optLen = p - pMCfgReq->config;
|
||||
strncpy(pDCfgReq->config, pMCfgReq->config, optLen);
|
||||
pDCfgReq->config[optLen] = 0;
|
||||
|
||||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
// 'key value'
|
||||
if (strlen(pMCfgReq->value) != 0) goto _err;
|
||||
strcpy(pDCfgReq->value, p + 1);
|
||||
} else {
|
||||
// 'key' 'value'
|
||||
if (strlen(pMCfgReq->value) == 0) goto _err;
|
||||
strcpy(pDCfgReq->value, pMCfgReq->value);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
mError("dnode:%d, failed to config since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndSendCfgDnodeReq(SMnode *pMnode, int32_t dnodeId, SDCfgDnodeReq *pDcfgReq) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pDnode->id == dnodeId || dnodeId == -1 || dnodeId == 0) {
|
||||
SEpSet epSet = mndGetDnodeEpset(pDnode);
|
||||
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, pDcfgReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
|
||||
if (pBuf != NULL) {
|
||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, pDcfgReq);
|
||||
mInfo("dnode:%d, send config req to dnode, config:%s value:%s", dnodeId, pDcfgReq->config, pDcfgReq->value);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
||||
tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
if (code == -1) {
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
const char *options[] = {
|
||||
"debugFlag", "dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
|
||||
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag",
|
||||
"uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag", "stDebugFlag",
|
||||
};
|
||||
int32_t optionSize = tListLen(options);
|
||||
|
||||
SMCfgDnodeReq cfgReq = {0};
|
||||
if (tDeserializeSMCfgDnodeReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -1206,139 +1263,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
SDCfgDnodeReq dcfgReq = {0};
|
||||
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
||||
strcpy(dcfgReq.config, "resetlog");
|
||||
} else if (strncasecmp(cfgReq.config, "monitor", 7) == 0) {
|
||||
if (' ' != cfgReq.config[7] && 0 != cfgReq.config[7]) {
|
||||
mError("dnode:%d, failed to config monitor since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *value = cfgReq.value;
|
||||
int32_t flag = atoi(value);
|
||||
if (flag <= 0) {
|
||||
flag = atoi(cfgReq.config + 8);
|
||||
}
|
||||
if (flag < 0 || flag > 2) {
|
||||
mError("dnode:%d, failed to config monitor since value:%d", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "monitor");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||
int32_t optLen = strlen("s3blocksize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag > 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3blocksize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) {
|
||||
int32_t optLen = strlen("s3blockcachesize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 4 || flag > 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]",
|
||||
cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3blockcachesize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3pagecachesize", 16) == 0) {
|
||||
int32_t optLen = strlen("s3pagecachesize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 4 || flag > 1024 * 1024 * 1024) {
|
||||
mError("dnode:%d, failed to config s3PageCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3pagecachesize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) {
|
||||
int32_t optLen = strlen("s3uploaddelaysec");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 600 || flag > 60 * 60 * 24 * 30) {
|
||||
mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]",
|
||||
cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3uploaddelaysec");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
||||
int32_t optLen = strlen("ttlpushinterval");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0 || flag > 100000) {
|
||||
mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId,
|
||||
flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "ttlpushinterval");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "ttlbatchdropnum", 15) == 0) {
|
||||
int32_t optLen = strlen("ttlbatchdropnum");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0) {
|
||||
mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId, flag,
|
||||
INT32_MAX);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "ttlbatchdropnum");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
} else if (strncasecmp(cfgReq.config, "asynclog", 8) == 0) {
|
||||
int32_t optLen = strlen("asynclog");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag < 0 || flag > 1) {
|
||||
mError("dnode:%d, failed to config asynclog since value:%d. Valid range: [0, 1]", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "asynclog");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
#ifdef TD_ENTERPRISE
|
||||
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
|
||||
int32_t optLen = strlen("supportvnodes");
|
||||
|
@ -1348,9 +1272,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
if (flag < 0 || flag > 4096) {
|
||||
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _err_out;
|
||||
}
|
||||
if (flag == 0) {
|
||||
flag = tsNumOfCores * 2;
|
||||
|
@ -1365,8 +1288,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
|
||||
mError("dnode:%d, failed to config activeCode since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
goto _err_out;
|
||||
}
|
||||
int32_t vlen = strlen(cfgReq.value);
|
||||
if (vlen > 0 && ((opt == DND_ACTIVE_CODE && vlen != (TSDB_ACTIVE_KEY_LEN - 1)) ||
|
||||
|
@ -1374,9 +1296,8 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
(vlen > (TSDB_CONN_ACTIVE_KEY_LEN - 1) || vlen < (TSDB_ACTIVE_KEY_LEN - 1))))) {
|
||||
mError("dnode:%d, failed to config activeCode since invalid vlen:%d. conf:%s, val:%s", cfgReq.dnodeId, vlen,
|
||||
cfgReq.config, cfgReq.value);
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
goto _err_out;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode");
|
||||
|
@ -1384,88 +1305,37 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
|
||||
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
goto _err_out;
|
||||
}
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return 0;
|
||||
#endif
|
||||
} else {
|
||||
bool findOpt = false;
|
||||
for (int32_t d = 0; d < optionSize; ++d) {
|
||||
const char *optName = options[d];
|
||||
int32_t optLen = strlen(optName);
|
||||
if (strncasecmp(cfgReq.config, optName, optLen) != 0) continue;
|
||||
|
||||
if (' ' != cfgReq.config[optLen] && 0 != cfgReq.config[optLen]) {
|
||||
mError("dnode:%d, failed to config since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
|
||||
mndMCfg2DCfg(&cfgReq, &dcfgReq);
|
||||
if (strlen(dcfgReq.config) > TSDB_DNODE_CONFIG_LEN) {
|
||||
mError("dnode:%d, failed to config since config is too long", cfgReq.dnodeId);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
goto _err_out;
|
||||
}
|
||||
|
||||
const char *value = cfgReq.value;
|
||||
int32_t flag = atoi(value);
|
||||
if (flag <= 0) {
|
||||
flag = atoi(cfgReq.config + optLen + 1);
|
||||
}
|
||||
if (flag < 0 || flag > 255) {
|
||||
mError("dnode:%d, failed to config %s since value:%d", cfgReq.dnodeId, optName, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tstrncpy(dcfgReq.config, optName, optLen + 1);
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
findOpt = true;
|
||||
}
|
||||
|
||||
if (!findOpt) {
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
mError("dnode:%d, failed to config since %s", cfgReq.dnodeId, terrstr());
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
if (cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true) != 0) goto _err_out;
|
||||
}
|
||||
|
||||
{ // audit
|
||||
char obj[50] = {0};
|
||||
sprintf(obj, "%d", cfgReq.dnodeId);
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "alterDnode", "", obj, cfgReq.sql, cfgReq.sqlLen);
|
||||
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
|
||||
}
|
||||
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
return mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
|
||||
|
||||
if (pDnode->id == cfgReq.dnodeId || cfgReq.dnodeId == -1 || cfgReq.dnodeId == 0) {
|
||||
SEpSet epSet = mndGetDnodeEpset(pDnode);
|
||||
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq);
|
||||
void *pBuf = rpcMallocCont(bufLen);
|
||||
|
||||
if (pBuf != NULL) {
|
||||
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
|
||||
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
|
||||
dcfgReq.config, dcfgReq.value);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
|
||||
tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
if (code == -1) {
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
return code;
|
||||
_err_out:
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
|
||||
|
@ -1606,16 +1476,16 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
|
|||
}
|
||||
|
||||
// get int32_t value from 'SMCfgDnodeReq'
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_t *pOutValue) {
|
||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t optLen, int32_t *pOutValue) {
|
||||
terrno = 0;
|
||||
if (' ' != pMCfgReq->config[opLen] && 0 != pMCfgReq->config[opLen]) {
|
||||
if (' ' != pMCfgReq->config[optLen] && 0 != pMCfgReq->config[optLen]) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (' ' == pMCfgReq->config[opLen]) {
|
||||
if (' ' == pMCfgReq->config[optLen]) {
|
||||
// 'key value'
|
||||
if (strlen(pMCfgReq->value) != 0) goto _err;
|
||||
*pOutValue = atoi(pMCfgReq->config + opLen + 1);
|
||||
*pOutValue = atoi(pMCfgReq->config + optLen + 1);
|
||||
} else {
|
||||
// 'key' 'value'
|
||||
if (strlen(pMCfgReq->value) == 0) goto _err;
|
||||
|
|
|
@ -144,7 +144,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->pTsdb = p->pVnode->pTsdb;
|
||||
p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
|
||||
p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX};
|
||||
p->info.suid = suid;
|
||||
p->numOfCols = numOfCols;
|
||||
p->pCidList = pCidList;
|
||||
|
|
|
@ -112,7 +112,6 @@ extern "C" {
|
|||
#define COMMAND_CATALOG_DEBUG "catalogDebug"
|
||||
#define COMMAND_ENABLE_MEM_DEBUG "enableMemDebug"
|
||||
#define COMMAND_DISABLE_MEM_DEBUG "disableMemDebug"
|
||||
#define COMMAND_ASYNCLOG "asynclog"
|
||||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
|
|
|
@ -808,16 +808,6 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
|
|||
return code;
|
||||
}
|
||||
qInfo("memory dbg disabled");
|
||||
} else if (0 == strcasecmp(cmd, COMMAND_ASYNCLOG)) {
|
||||
int newAsyncLogValue = (strlen(value) == 0) ? 1 : atoi(value);
|
||||
if (newAsyncLogValue != 0 && newAsyncLogValue != 1) {
|
||||
code = TSDB_CODE_INVALID_CFG_VALUE;
|
||||
qError("failed to alter asynclog, error:%s", tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
tsAsyncLog = newAsyncLogValue;
|
||||
} else {
|
||||
goto _return;
|
||||
}
|
||||
|
@ -844,10 +834,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
bool forbidden = false;
|
||||
taosLocalCfgForbiddenToChange(pStmt->config, &forbidden);
|
||||
if (forbidden) {
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
if (cfgCheckRangeForDynUpdate(tsCfg, pStmt->config, pStmt->value, false)) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool clearState;
|
||||
bool recvGetAll;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
SArray* historyWins;
|
||||
bool reCkBlock;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
bool recvGetAll;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
|
|
|
@ -373,8 +373,11 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
|||
if (pGroupResInfo->freeItem) {
|
||||
int32_t size = taosArrayGetSize(pGroupResInfo->pRows);
|
||||
for (int32_t i = pGroupResInfo->index; i < size; i++) {
|
||||
void* pVal = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
taosMemoryFree(pVal);
|
||||
SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
if (!pPos->needFree && !pPos->pRowBuff) {
|
||||
taosMemoryFreeClear(pPos->pKey);
|
||||
taosMemoryFree(pPos);
|
||||
}
|
||||
}
|
||||
pGroupResInfo->freeItem = false;
|
||||
}
|
||||
|
@ -2532,6 +2535,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
|||
taosMemoryFree(buf);
|
||||
}
|
||||
|
||||
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
|
||||
void* pIte = NULL;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
|
||||
SResultWindowInfo* pResInfo = pIte;
|
||||
pResInfo->pStatePos->beUsed = true;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -2546,6 +2558,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
if (opRes) {
|
||||
return opRes;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2583,6 +2601,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -2838,6 +2857,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->clearState = false;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
@ -3454,6 +3475,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
return resBlock;
|
||||
}
|
||||
|
||||
if (pInfo->recvGetAll) {
|
||||
pInfo->recvGetAll = false;
|
||||
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -3482,6 +3508,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
pInfo->recvGetAll = true;
|
||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
|
@ -3712,6 +3739,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
}
|
||||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
|
|
|
@ -1989,7 +1989,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
}
|
||||
if (para2Bytes <= 0 || para2Bytes > 4096) { // cast dst var type length limits to 4096 bytes
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"CAST function converted length should be in range [0, 4096] bytes");
|
||||
"CAST function converted length should be in range (0, 4096] bytes");
|
||||
}
|
||||
|
||||
// add database precision as param
|
||||
|
@ -3312,26 +3312,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = castFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_iso8601",
|
||||
.type = FUNCTION_TYPE_TO_ISO8601,
|
||||
|
@ -3709,6 +3689,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = qVgIdFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_timestamp",
|
||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||
.translateFunc = translateToTimestamp,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toTimestampFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "to_char",
|
||||
.type = FUNCTION_TYPE_TO_CHAR,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.translateFunc = translateToChar,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = toCharFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -285,7 +285,7 @@ static SNode* createConstantValue() {
|
|||
static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelect, bool subquery) {
|
||||
SNode* pProj = NULL;
|
||||
WHERE_EACH(pProj, pSelect->pProjectionList) {
|
||||
if (subquery && !pSelect->isDistinct && isUselessCol((SExprNode*)pProj)) {
|
||||
if (subquery && !pSelect->isDistinct && !pSelect->tagScan && isUselessCol((SExprNode*)pProj)) {
|
||||
ERASE_NODE(pSelect->pProjectionList);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -166,6 +166,7 @@ static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) {
|
|||
}
|
||||
|
||||
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pDuplicate = false;
|
||||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -173,13 +174,13 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
|
|||
STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
|
||||
if (NULL != pMeta) {
|
||||
*pDuplicate = true;
|
||||
int32_t code = ignoreUsingClause(pCxt, &pStmt->pSql);
|
||||
code = ignoreUsingClause(pCxt, &pStmt->pSql);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
return cloneTableMeta(*pMeta, &pStmt->pTableMeta);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
|
@ -1594,11 +1595,9 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
|
|||
|
||||
if (pToken->n > 0) {
|
||||
if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
|
||||
memcpy(ctbName, pToken->z, pToken->n);
|
||||
ctbName[pToken->n] = '\0';
|
||||
memcpy(pStbRowsCxt->ctbName.tname, pToken->z, pToken->n);
|
||||
pStbRowsCxt->ctbName.tname[pToken->n] = '\0';
|
||||
*pFoundCtbName = true;
|
||||
tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname));
|
||||
tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, pToken->n);
|
||||
} else {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "tbname is too long");
|
||||
}
|
||||
|
@ -1644,10 +1643,13 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
SArray* pTagNames = pStbRowsCxt->aTagNames;
|
||||
SArray* pTagVals = pStbRowsCxt->aTagVals;
|
||||
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
|
||||
int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
|
||||
int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
|
||||
for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
|
||||
const char* pTmpSql = *ppSql;
|
||||
bool ignoreComma = false;
|
||||
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
|
||||
|
||||
if (ignoreComma) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
|
||||
break;
|
||||
|
@ -1657,12 +1659,11 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) {
|
||||
if (pCols->pColIndex[i] < numOfCols) {
|
||||
const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
|
||||
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
|
||||
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal);
|
||||
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||
} else if (pCols->pColIndex[i] < tbnameIdx) {
|
||||
const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
|
||||
if (canParseTagsAfter) {
|
||||
tagTokens[(*pNumOfTagTokens)] = *pToken;
|
||||
|
@ -1675,7 +1676,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
}
|
||||
}
|
||||
}
|
||||
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||
else if (pCols->pColIndex[i] == tbnameIdx) {
|
||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName);
|
||||
}
|
||||
|
@ -1756,7 +1757,7 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
|||
.requestId = pCxt->pComCxt->requestId,
|
||||
.requestObjRefId = pCxt->pComCxt->requestRid,
|
||||
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
|
||||
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
|
||||
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
|
||||
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
|
||||
|
@ -1771,12 +1772,6 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
|
|||
return code;
|
||||
}
|
||||
|
||||
static void resetStbRowsDataContextPreStbRow(SStbRowsDataContext* pStbRowsCxt) {
|
||||
pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
|
||||
pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
|
||||
|
||||
insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
|
||||
}
|
||||
|
||||
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
||||
if (pStbRowsCxt == NULL) return;
|
||||
|
@ -1791,19 +1786,15 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
|||
taosArrayClear(pStbRowsCxt->aTagVals);
|
||||
|
||||
clearColValArray(pStbRowsCxt->aColVals);
|
||||
taosArrayClear(pStbRowsCxt->aColVals);
|
||||
|
||||
tTagFree(pStbRowsCxt->pTag);
|
||||
pStbRowsCxt->pTag = NULL;
|
||||
pStbRowsCxt->pCtbMeta->uid = 0;
|
||||
pStbRowsCxt->pCtbMeta->vgId = 0;
|
||||
tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
|
||||
taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
|
||||
}
|
||||
|
||||
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
||||
resetStbRowsDataContextPreStbRow(pStbRowsCxt);
|
||||
bool bFirstTable = false;
|
||||
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
|
||||
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
|
||||
|
@ -1825,6 +1816,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
|||
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
|
||||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
*pGotRow = true;
|
||||
}
|
||||
|
@ -2100,6 +2092,10 @@ static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDa
|
|||
collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
|
||||
collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
|
||||
|
||||
pStbRowsCxt->ctbName.type = TSDB_TABLE_NAME_T;
|
||||
pStbRowsCxt->ctbName.acctId = pStbRowsCxt->stbName.acctId;
|
||||
memcpy(pStbRowsCxt->ctbName.dbname, pStbRowsCxt->stbName.dbname, sizeof(pStbRowsCxt->stbName.dbname));
|
||||
|
||||
pStbRowsCxt->pTagCond = pStmt->pTagCond;
|
||||
pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
|
||||
|
||||
|
|
|
@ -434,17 +434,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
|||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput);
|
||||
}
|
||||
|
||||
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (rsp) {
|
||||
if (NULL != rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
@ -456,7 +461,6 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
|||
dataLen);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#include "syncReplication.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
|
||||
|
||||
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
||||
|
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
}
|
||||
|
||||
int dataLen = 0;
|
||||
if (snapInfo.data) {
|
||||
SSyncTLV *datHead = snapInfo.data;
|
||||
void *pData = snapInfo.data;
|
||||
int32_t type = 0;
|
||||
if (pData) {
|
||||
type = snapInfo.type;
|
||||
SSyncTLV *datHead = pData;
|
||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
|
||||
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
|
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
if (dataLen > 0) {
|
||||
pMsg->payloadType = snapInfo.type;
|
||||
memcpy(pMsg->data, snapInfo.data, dataLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
|
||||
code = 0;
|
||||
_out:
|
||||
if (snapInfo.data) {
|
||||
|
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
|||
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
||||
}
|
||||
|
||||
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
|
||||
int32_t code = -1;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = seq;
|
||||
|
||||
if (pBlock != NULL && blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlock, blockLen);
|
||||
}
|
||||
pMsg->payloadType = typ;
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
_OUT:
|
||||
return code;
|
||||
}
|
||||
|
||||
// when sender receive ack, call this function to send msg from seq
|
||||
// seq = ack + 1, already updated
|
||||
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||
|
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
|
||||
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
|
||||
|
||||
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pSender->seq;
|
||||
|
||||
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
||||
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
|
||||
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
|
||||
goto _OUT;
|
||||
}
|
||||
|
||||
|
@ -336,36 +330,17 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
|
||||
continue;
|
||||
}
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
pMsg->term = pSender->term;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->startTime = pSender->startTime;
|
||||
pMsg->seq = pBlk->seq;
|
||||
|
||||
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
||||
}
|
||||
|
||||
// send msg
|
||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
|
||||
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
pBlk->sendTimeMs = nowMs;
|
||||
}
|
||||
|
||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
||||
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
code = 0;
|
||||
_out:;
|
||||
taosThreadMutexUnlock(&pSndBuf->mutex);
|
||||
|
@ -861,7 +836,7 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
|
|||
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||
pRspMsg->startTime = pMsg->startTime;
|
||||
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
||||
pRspMsg->ack = pMsg->seq;
|
||||
pRspMsg->code = code;
|
||||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||
|
||||
|
@ -893,13 +868,13 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
|
|||
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
||||
ppMsg[0] = NULL;
|
||||
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
||||
} else {
|
||||
} else if (pMsg->seq < pRcvBuf->start) {
|
||||
syncSnapSendRsp(pReceiver, pMsg, code);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
|
||||
if (pRcvBuf->entries[seq]) {
|
||||
if (pRcvBuf->entries[seq % pRcvBuf->size]) {
|
||||
pRcvBuf->cursor = seq;
|
||||
} else {
|
||||
break;
|
||||
|
|
|
@ -310,6 +310,7 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
|||
GRANT_CFG_SET;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
if (pItem == NULL) {
|
||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -338,6 +339,7 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
|||
break;
|
||||
}
|
||||
|
||||
_err_out:
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
@ -357,6 +359,50 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) {
|
||||
ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
if (!pItem || (pItem->dynScope & dynType) == 0) {
|
||||
uError("failed to config:%s, not support", name);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_INT32: {
|
||||
int32_t ival = (int32_t)atoi(pVal);
|
||||
if (ival < pItem->imin || ival > pItem->imax) {
|
||||
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
case CFG_DTYPE_INT64: {
|
||||
int64_t ival = (int64_t)atoll(pVal);
|
||||
if (ival < pItem->imin || ival > pItem->imax) {
|
||||
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
case CFG_DTYPE_FLOAT:
|
||||
case CFG_DTYPE_DOUBLE: {
|
||||
float fval = (float)atof(pVal);
|
||||
if (fval < pItem->fmin || fval > pItem->fmax) {
|
||||
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval,
|
||||
pItem->fmin, pItem->fmax);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
||||
pItem->stype = CFG_STYPE_DEFAULT;
|
||||
pItem->name = taosStrdup(name);
|
||||
|
@ -381,43 +427,61 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .scope = scope};
|
||||
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .scope = scope, .dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
|
||||
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT32, .i32 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT32,
|
||||
.i32 = defaultVal,
|
||||
.imin = minval,
|
||||
.imax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
|
||||
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT64, .i64 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_INT64,
|
||||
.i64 = defaultVal,
|
||||
.imin = minval,
|
||||
.imax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope) {
|
||||
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope,
|
||||
int8_t dynScope) {
|
||||
if (defaultVal < minval || defaultVal > maxval) {
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT, .fval = defaultVal, .fmin = minval, .fmax = maxval, .scope = scope};
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT,
|
||||
.fval = defaultVal,
|
||||
.fmin = minval,
|
||||
.fmax = maxval,
|
||||
.scope = scope,
|
||||
.dynScope = dynScope};
|
||||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope};
|
||||
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope, .dynScope = dynScope};
|
||||
item.str = taosStrdup(defaultVal);
|
||||
if (item.str == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -426,8 +490,8 @@ int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, in
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .scope = scope};
|
||||
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetDir(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -435,8 +499,8 @@ int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope};
|
||||
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetLocale(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -444,8 +508,8 @@ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, in
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope};
|
||||
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetCharset(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -453,8 +517,8 @@ int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, i
|
|||
return cfgAddItem(pCfg, &item, name);
|
||||
}
|
||||
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope};
|
||||
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) {
|
||||
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope, .dynScope = dynScope};
|
||||
if (cfgCheckAndSetTimezone(&item, defaultVal) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -57,7 +57,6 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t fileNum;
|
||||
int32_t maxLines;
|
||||
int32_t lines;
|
||||
int32_t flag;
|
||||
int32_t openInProgress;
|
||||
|
@ -122,7 +121,7 @@ static void *taosAsyncOutputLog(void *param);
|
|||
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
|
||||
static SLogBuff *taosLogBuffNew(int32_t bufSize);
|
||||
static void taosCloseLogByFd(TdFilePtr pFile);
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum);
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxFileNum);
|
||||
|
||||
static FORCE_INLINE void taosUpdateDaylight() {
|
||||
struct tm Tm, *ptm;
|
||||
|
@ -186,7 +185,7 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) {
|
|||
|
||||
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
|
||||
if (tsLogObj.logHandle == NULL) return -1;
|
||||
if (taosOpenLogFile(fullName, tsNumOfLogLines, maxFiles) < 0) return -1;
|
||||
if (taosOpenLogFile(fullName, maxFiles) < 0) return -1;
|
||||
|
||||
if (taosInitSlowLog() < 0) return -1;
|
||||
if (taosStartLog() < 0) return -1;
|
||||
|
@ -283,7 +282,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
|||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
tsLogObj.openInProgress = 0;
|
||||
tsLogObj.lines = tsLogObj.maxLines - 1000;
|
||||
tsLogObj.lines = tsNumOfLogLines - 1000;
|
||||
uError("open new log file fail! reason:%s, reuse lastlog", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
@ -308,7 +307,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
|||
static int32_t taosOpenNewLogFile() {
|
||||
taosThreadMutexLock(&tsLogObj.logMutex);
|
||||
|
||||
if (tsLogObj.lines > tsLogObj.maxLines && tsLogObj.openInProgress == 0) {
|
||||
if (tsLogObj.lines > tsNumOfLogLines && tsLogObj.openInProgress == 0) {
|
||||
tsLogObj.openInProgress = 1;
|
||||
|
||||
uInfo("open new log file ......");
|
||||
|
@ -331,7 +330,7 @@ void taosResetLog() {
|
|||
sprintf(lastName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
|
||||
|
||||
// force create a new log file
|
||||
tsLogObj.lines = tsLogObj.maxLines + 10;
|
||||
tsLogObj.lines = tsNumOfLogLines + 10;
|
||||
|
||||
taosOpenNewLogFile();
|
||||
(void)taosRemoveFile(lastName);
|
||||
|
@ -384,7 +383,7 @@ static void taosGetLogFileName(char *fn) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
||||
static int32_t taosOpenLogFile(char *fn, int32_t maxFileNum) {
|
||||
#ifdef WINDOWS_STASH
|
||||
/*
|
||||
* always set maxFileNum to 1
|
||||
|
@ -396,7 +395,6 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
|||
char name[LOG_FILE_NAME_LEN + 50] = "\0";
|
||||
int32_t logstat0_mtime, logstat1_mtime;
|
||||
|
||||
tsLogObj.maxLines = maxLines;
|
||||
tsLogObj.fileNum = maxFileNum;
|
||||
taosGetLogFileName(fn);
|
||||
|
||||
|
@ -497,9 +495,9 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
|
|||
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
|
||||
}
|
||||
|
||||
if (tsLogObj.maxLines > 0) {
|
||||
if (tsNumOfLogLines > 0) {
|
||||
atomic_add_fetch_32(&tsLogObj.lines, 1);
|
||||
if ((tsLogObj.lines > tsLogObj.maxLines) && (tsLogObj.openInProgress == 0)) {
|
||||
if ((tsLogObj.lines > tsNumOfLogLines) && (tsLogObj.openInProgress == 0)) {
|
||||
taosOpenNewLogFile();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,12 +54,12 @@ TEST_F(CfgTest, 02_Basic) {
|
|||
SConfig *pConfig = cfgInit();
|
||||
ASSERT_NE(pConfig, nullptr);
|
||||
|
||||
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0), 0);
|
||||
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0), 0);
|
||||
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0), 0);
|
||||
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0), 0);
|
||||
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 0), 0);
|
||||
|
||||
EXPECT_EQ(cfgGetSize(pConfig), 6);
|
||||
|
||||
|
|
|
@ -81,10 +81,10 @@ pip3 list|grep taospy
|
|||
pip3 uninstall taospy -y
|
||||
pip3 install --default-timeout=120 taospy==2.7.12
|
||||
|
||||
#define taos-ws-py 0.2.8
|
||||
#define taos-ws-py 0.3.1
|
||||
pip3 list|grep taos-ws-py
|
||||
pip3 uninstall taos-ws-py -y
|
||||
pip3 install --default-timeout=120 taos-ws-py==0.2.8
|
||||
pip3 install --default-timeout=600 taos-ws-py==0.3.1
|
||||
|
||||
$TIMEOUT_CMD $cmd
|
||||
RET=$?
|
||||
|
|
|
@ -21,9 +21,16 @@
|
|||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/time.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "taos.h" // TAOS header file
|
||||
|
||||
static int64_t currTimeInUs() {
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
return (start_time.tv_sec) * 1000000 + (start_time.tv_usec);
|
||||
}
|
||||
|
||||
static void executeSql(TAOS *taos, char *command) {
|
||||
int i;
|
||||
TAOS_RES *pSql = NULL;
|
||||
|
@ -52,7 +59,7 @@ static void executeSql(TAOS *taos, char *command) {
|
|||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
void testInsert(TAOS *taos, char *qstr) {
|
||||
void testInsert(TAOS *taos, char *qstr, double* pElapsedTime) {
|
||||
executeSql(taos, "drop database if exists demo2");
|
||||
executeSql(taos, "create database demo2");
|
||||
executeSql(taos, "use demo2");
|
||||
|
@ -60,8 +67,7 @@ void testInsert(TAOS *taos, char *qstr) {
|
|||
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))");
|
||||
printf("success to create table\n");
|
||||
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
int64_t ts1 = currTimeInUs();
|
||||
|
||||
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||
int len = 0;
|
||||
|
@ -84,15 +90,16 @@ void testInsert(TAOS *taos, char *qstr) {
|
|||
taos_free_result(result1);
|
||||
}
|
||||
}
|
||||
struct timeval end_time;
|
||||
gettimeofday(&end_time, NULL);
|
||||
double elapsed_time = (double)(end_time.tv_sec - start_time.tv_sec) +
|
||||
(double)(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||
printf("elapsed time: %.3f\n", elapsed_time);
|
||||
|
||||
int64_t ts2 = currTimeInUs();
|
||||
double elapsedTime = (double)(ts2-ts1) / 1000000.0;
|
||||
*pElapsedTime = elapsedTime;
|
||||
|
||||
printf("elapsed time: %.3f\n", elapsedTime);
|
||||
executeSql(taos, "drop database if exists demo2");
|
||||
}
|
||||
|
||||
void testInsertStb(TAOS *taos, char *qstr) {
|
||||
void testInsertStb(TAOS *taos, char *qstr, double *pElapsedTime) {
|
||||
executeSql(taos, "drop database if exists demo");
|
||||
executeSql(taos, "create database demo");
|
||||
executeSql(taos, "use demo");
|
||||
|
@ -100,8 +107,7 @@ void testInsertStb(TAOS *taos, char *qstr) {
|
|||
executeSql(taos, "create table st (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10)) tags(t1 int, t2 float, t3 binary(10))");
|
||||
printf("success to create table\n");
|
||||
|
||||
struct timeval start_time;
|
||||
gettimeofday(&start_time, NULL);
|
||||
int64_t ts1 = currTimeInUs();
|
||||
|
||||
for (int tblIdx = 0; tblIdx < 10; ++tblIdx) {
|
||||
int len = 0;
|
||||
|
@ -125,12 +131,11 @@ void testInsertStb(TAOS *taos, char *qstr) {
|
|||
taos_free_result(result1);
|
||||
}
|
||||
}
|
||||
struct timeval end_time;
|
||||
gettimeofday(&end_time, NULL);
|
||||
double elapsed_time = (double)(end_time.tv_sec - start_time.tv_sec) +
|
||||
(double)(end_time.tv_usec - start_time.tv_usec) / 1000000.0;
|
||||
|
||||
printf("elapsed time: %.3f\n", elapsed_time);
|
||||
int64_t ts2 = currTimeInUs();
|
||||
double elapsedTime = (double)(ts2 - ts1) / 1000000.0;
|
||||
*pElapsedTime = elapsedTime;
|
||||
printf("elapsed time: %.3f\n", elapsedTime);
|
||||
executeSql(taos, "drop database if exists demo");
|
||||
}
|
||||
|
||||
|
@ -149,13 +154,26 @@ int main(int argc, char *argv[]) {
|
|||
exit(1);
|
||||
}
|
||||
char* qstr = malloc(1024*1024);
|
||||
{
|
||||
printf("test insert into tb using stb\n\n");
|
||||
double sum = 0;
|
||||
for (int i =0; i < 5; ++i) {
|
||||
testInsert(taos, qstr);
|
||||
double elapsed = 0;
|
||||
testInsert(taos, qstr, &elapsed);
|
||||
sum += elapsed;
|
||||
}
|
||||
printf("average insert tb using stb time : %.3f\n", sum/5);
|
||||
}
|
||||
printf("test insert into stb tbname\n\n");
|
||||
{
|
||||
printf("test insert into stb\n\n");
|
||||
double sum = 0;
|
||||
for (int i =0; i < 5; ++i) {
|
||||
testInsertStb(taos, qstr);
|
||||
double elapsed = 0;
|
||||
testInsertStb(taos, qstr, &elapsed);
|
||||
sum += elapsed;
|
||||
}
|
||||
printf("average insert into stb time : %.3f\n", sum/5);
|
||||
}
|
||||
free(qstr);
|
||||
taos_close(taos);
|
||||
|
|
|
@ -19,6 +19,7 @@ exe:
|
|||
gcc $(CFLAGS) ./whiteListTest.c -o $(ROOT)whiteListTest $(LFLAGS)
|
||||
gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS)
|
||||
gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS)
|
||||
gcc $(CFLAGS) ./stmtQuery.c -o $(ROOT)stmtQuery $(LFLAGS)
|
||||
|
||||
clean:
|
||||
rm $(ROOT)batchprepare
|
||||
|
@ -29,3 +30,4 @@ clean:
|
|||
rm $(ROOT)whiteListTest
|
||||
rm $(ROOT)insert_stb
|
||||
rm $(ROOT)tmqViewTest
|
||||
rm $(ROOT)stmtQuery
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
#include "../../../include/client/taos.h"
|
||||
|
||||
#define PRINT_ERROR printf("\033[31m");
|
||||
#define PRINT_SUCCESS printf("\033[32m");
|
||||
|
||||
void execute_simple_sql(void *taos, char *sql) {
|
||||
TAOS_RES *result = taos_query(taos, sql);
|
||||
if ( result == NULL || taos_errno(result) != 0) {
|
||||
PRINT_ERROR
|
||||
printf("failed to %s, Reason: %s\n", sql, taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
taos_free_result(result);
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully %s\n", sql);
|
||||
}
|
||||
|
||||
void check_result(TAOS_RES *result, int expected) {
|
||||
int rows = 0;
|
||||
TAOS_ROW row;
|
||||
while ((row = taos_fetch_row(result))) {
|
||||
rows++;
|
||||
}
|
||||
if (rows == expected) {
|
||||
PRINT_SUCCESS
|
||||
printf("%d rows are fetched as expected\n", rows);
|
||||
} else {
|
||||
PRINT_ERROR
|
||||
printf("%d rows are fetched but %d expected\n", rows, expected);
|
||||
}
|
||||
taos_free_result(result);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
PRINT_ERROR
|
||||
printf("TDengine error: failed to connect\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully connected to TDengine\n");
|
||||
|
||||
execute_simple_sql(taos, "drop database if exists test");
|
||||
execute_simple_sql(taos, "create database test");
|
||||
execute_simple_sql(taos, "use test");
|
||||
execute_simple_sql(taos, "create table super(ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(8), c6 smallint, c7 tinyint, c8 bool, c9 nchar(8), c10 timestamp) tags (t1 int, t2 bigint, t3 float, t4 double, t5 binary(8), t6 smallint, t7 tinyint, t8 bool, t9 nchar(8))");
|
||||
|
||||
char *sql = calloc(1, 1024*1024);
|
||||
int sqlLen = 0;
|
||||
sqlLen = sprintf(sql, "create table");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sqlLen += sprintf(sql + sqlLen, " t%d using super tags (%d, 2147483648, 0.1, 0.000000001, 'abcdefgh', 32767, 127, 1, '一二三四五六七八')", i, i);
|
||||
}
|
||||
execute_simple_sql(taos, sql);
|
||||
|
||||
strcpy(sql, "insert into t0 (ts, c1) values(now, 1)");
|
||||
execute_simple_sql(taos, sql);
|
||||
|
||||
strcpy(sql, "insert into t0 (ts, c1) values(now, 2)");
|
||||
execute_simple_sql(taos, sql);
|
||||
|
||||
strcpy(sql, "insert into t0 (ts, c1) values(now, 3)");
|
||||
execute_simple_sql(taos, sql);
|
||||
|
||||
|
||||
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
|
||||
if (code != 0) {
|
||||
PRINT_ERROR
|
||||
printf("failed to load table info: 0x%08x\n", code);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully load table info\n");
|
||||
|
||||
TAOS_STMT *stmt = taos_stmt_init(taos);
|
||||
if (stmt == NULL) {
|
||||
PRINT_ERROR
|
||||
printf("TDengine error: failed to init taos_stmt\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully init taos_stmt\n");
|
||||
|
||||
|
||||
char* condBuf = "2 or c1 > 0";
|
||||
TAOS_MULTI_BIND params[1];
|
||||
params[0].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||
params[0].buffer_length = strlen(condBuf) + 1;
|
||||
params[0].buffer = condBuf;
|
||||
params[0].length = (int*)¶ms[0].buffer_length;
|
||||
params[0].is_null = NULL;
|
||||
params[0].num = 1;
|
||||
|
||||
char *stmt_sql = "select * from super where c1 > ?";
|
||||
code = taos_stmt_prepare(stmt, stmt_sql, 0);
|
||||
if (code != 0){
|
||||
PRINT_ERROR
|
||||
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully execute taos_stmt_prepare\n");
|
||||
|
||||
taos_stmt_bind_param(stmt, params);
|
||||
taos_stmt_add_batch(stmt);
|
||||
|
||||
if (taos_stmt_execute(stmt) != 0) {
|
||||
PRINT_ERROR
|
||||
printf("failed to execute query statement.\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
PRINT_SUCCESS
|
||||
printf("Successfully execute query statement.\n");
|
||||
|
||||
TAOS_RES *result = taos_stmt_use_result(stmt);
|
||||
check_result(result, 1);
|
||||
|
||||
|
||||
taos_stmt_close(stmt);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -319,4 +319,7 @@ if $data45 != 30.000000000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql_error insert into ct1 using stb tags('a', 'b') values ('2022-06-26 13:00:00', 1) ct11 using sta tags('c', 'b#) values ('2022-06-26 13:00:01', 2);
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -51,6 +51,21 @@ print $rows
|
|||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create database dgxy;
|
||||
sql use dgxy;
|
||||
sql create table st(ts timestamp, f int) tags(t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1);
|
||||
sql insert into st(tbname, ts, f) values('ct1', now, 2);
|
||||
sql select * from ct1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
sql show tables like 'ct1';
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error insert into d2.st values(now, 1, 1)
|
||||
sql_error insert into d2.st(ts, f) values(now, 1);
|
||||
sql_error insert into d2.st(ts, f, tbname) values(now, 1);
|
||||
|
|
|
@ -140,4 +140,28 @@ if $data01 != @1aa@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create table stb34 (ts timestamp, f int) tags(t int);
|
||||
sql insert into ctb34 using stb34 tags(1) values(now, 1)(now+1s, 2);
|
||||
sql select 1 from (select tags t from stb34 order by t)
|
||||
print $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select tags t from stb34)
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select 1 from (select tags ts from stb34)
|
||||
print $rows
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select tags ts from stb34)
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -176,7 +176,7 @@ class TDTestCase:
|
|||
tdSql.checkData(3, 2, 10)
|
||||
tdSql.checkData(4, 0, datetime.datetime(2021, 4, 19, 8, 0, 13))
|
||||
tdSql.checkData(4, 1, 1)
|
||||
tdSql.checkData(4, 2, None)
|
||||
tdSql.checkData(4, 2, 1)
|
||||
|
||||
def run_insert_stb(self):
|
||||
print("running {}".format('insert_stb'))
|
||||
|
|
Loading…
Reference in New Issue