Merge branch 'main' into feature/3_liaohj
This commit is contained in:
commit
5e9f53560d
|
@ -387,7 +387,7 @@ pipeline {
|
|||
}
|
||||
steps {
|
||||
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
||||
timeout(time: 75, unit: 'MINUTES'){
|
||||
timeout(time: 126, unit: 'MINUTES'){
|
||||
pre_test_win()
|
||||
pre_test_build_win()
|
||||
run_win_ctest()
|
||||
|
@ -423,7 +423,7 @@ pipeline {
|
|||
echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}"
|
||||
}
|
||||
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
||||
timeout(time: 120, unit: 'MINUTES'){
|
||||
timeout(time: 130, unit: 'MINUTES'){
|
||||
pre_test()
|
||||
script {
|
||||
sh '''
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# taosadapter
|
||||
ExternalProject_Add(taosadapter
|
||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||
GIT_TAG db6c843
|
||||
GIT_TAG 9cfe416
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
## TDengine SpringBoot + Mybatis Demo
|
||||
|
||||
## 需要提前创建 test 数据库
|
||||
|
||||
```
|
||||
$ taos -s 'create database if not exists test'
|
||||
|
||||
$ curl http://localhost:8080/weather/init
|
||||
```
|
||||
|
||||
### 配置 application.properties
|
||||
```properties
|
||||
# datasource config
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
<id column="ts" jdbcType="TIMESTAMP" property="ts"/>
|
||||
<result column="temperature" jdbcType="FLOAT" property="temperature"/>
|
||||
<result column="humidity" jdbcType="FLOAT" property="humidity"/>
|
||||
<result column="bytes" jdbcType="BINARY" property="bytes" />
|
||||
</resultMap>
|
||||
|
||||
<select id="lastOne" resultType="java.util.Map">
|
||||
|
@ -36,6 +37,11 @@
|
|||
binary
|
||||
(
|
||||
64
|
||||
),
|
||||
bytes
|
||||
binary
|
||||
(
|
||||
64
|
||||
)) tags
|
||||
(
|
||||
location nchar
|
||||
|
@ -63,8 +69,8 @@
|
|||
</select>
|
||||
|
||||
<insert id="insert" parameterType="com.taosdata.example.springbootdemo.domain.Weather">
|
||||
insert into test.t#{groupId} (ts, temperature, humidity, note)
|
||||
values (#{ts}, ${temperature}, ${humidity}, #{note})
|
||||
insert into test.t#{groupId} (ts, temperature, humidity, note, bytes)
|
||||
values (#{ts}, ${temperature}, ${humidity}, #{note}, #{bytes})
|
||||
</insert>
|
||||
|
||||
<select id="getSubTables" resultType="String">
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.taosdata.example.springbootdemo.domain;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Timestamp;
|
||||
|
||||
public class Weather {
|
||||
|
@ -12,6 +13,9 @@ public class Weather {
|
|||
private Float humidity;
|
||||
private String location;
|
||||
private String note;
|
||||
// In rest mode, the byte[] type is not recommended.
|
||||
// UTF-8 is used to encode the byte arrays, that result may affect the SQL correctness
|
||||
private byte[] bytes;
|
||||
private int groupId;
|
||||
|
||||
public Weather() {
|
||||
|
@ -70,4 +74,30 @@ public class Weather {
|
|||
public void setNote(String note) {
|
||||
this.note = note;
|
||||
}
|
||||
|
||||
public byte[] getBytes() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
public void setBytes(byte[] bytes) {
|
||||
this.bytes = bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("Weather{");
|
||||
sb.append("ts=").append(ts);
|
||||
sb.append(", temperature=").append(temperature);
|
||||
sb.append(", humidity=").append(humidity);
|
||||
sb.append(", location='").append(location).append('\'');
|
||||
sb.append(", note='").append(note).append('\'');
|
||||
sb.append(", bytes -> string=");
|
||||
if (bytes == null) sb.append("null");
|
||||
else {
|
||||
sb.append(new String(bytes, StandardCharsets.UTF_8));
|
||||
}
|
||||
sb.append(", groupId=").append(groupId);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import com.taosdata.example.springbootdemo.domain.Weather;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -30,6 +31,7 @@ public class WeatherService {
|
|||
weather.setLocation(locations[random.nextInt(locations.length)]);
|
||||
weather.setGroupId(i % locations.length);
|
||||
weather.setNote("note-" + i);
|
||||
weather.setBytes(locations[random.nextInt(locations.length)].getBytes(StandardCharsets.UTF_8));
|
||||
weatherMapper.createTable(weather);
|
||||
count += weatherMapper.insert(weather);
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize;
|
|||
// queue & threads
|
||||
extern int32_t tsNumOfRpcThreads;
|
||||
extern int32_t tsNumOfRpcSessions;
|
||||
extern int32_t tsTimeToGetAvailableConn;
|
||||
extern int32_t tsNumOfCommitThreads;
|
||||
extern int32_t tsNumOfTaskQueueThreads;
|
||||
extern int32_t tsNumOfMnodeQueryThreads;
|
||||
|
|
|
@ -53,10 +53,6 @@ typedef struct {
|
|||
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
|
||||
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
|
||||
|
||||
// this data type is internally used only in 'in' query to hold the values
|
||||
#define TSDB_DATA_TYPE_POINTER_ARRAY (1000)
|
||||
#define TSDB_DATA_TYPE_VALUE_ARRAY (1001)
|
||||
|
||||
#define GET_TYPED_DATA(_v, _finalType, _type, _data) \
|
||||
do { \
|
||||
switch (_type) { \
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#define _TD_UTIL_HTTP_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "tref.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -24,7 +25,8 @@ extern "C" {
|
|||
|
||||
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
|
||||
|
||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag);
|
||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||
EHttpCompFlag flag);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ typedef struct SRpcInit {
|
|||
|
||||
int32_t connLimitNum;
|
||||
int32_t connLimitLock;
|
||||
|
||||
int32_t timeToGetConn;
|
||||
int8_t supportBatch; // 0: no batch, 1. batch
|
||||
int32_t batchSize;
|
||||
void *parent;
|
||||
|
|
|
@ -35,6 +35,7 @@ extern "C" {
|
|||
#endif
|
||||
#endif // ifndef ALLOW_FORBID_FUNC
|
||||
|
||||
#endif // ifndef ALLOW_FORBID_FUNC
|
||||
#endif // if !defined(WINDOWS)
|
||||
|
||||
int32_t taosMemoryDbgInit();
|
||||
|
|
|
@ -99,6 +99,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
|
|||
*/
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject);
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
|
||||
jobject);
|
||||
|
||||
/*
|
||||
* Class: com_taosdata_jdbc_tmq_TMQConnector
|
||||
* Method: tmqUnsubscribeImp
|
||||
|
|
|
@ -144,7 +144,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.numOfThreads = numOfThread;
|
||||
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||
rpcInit.cfp = processMsgFromServer;
|
||||
rpcInit.rfp = clientRpcRfp;
|
||||
rpcInit.sessions = 1024;
|
||||
|
@ -159,6 +159,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
|
||||
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
|
||||
|
||||
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5);
|
||||
connLimitNum = TMAX(connLimitNum, 10);
|
||||
connLimitNum = TMIN(connLimitNum, 500);
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
void *pDnodeConn = rpcOpen(&rpcInit);
|
||||
if (pDnodeConn == NULL) {
|
||||
tscError("failed to init connection to server");
|
||||
|
@ -517,7 +523,7 @@ void taos_init_imp(void) {
|
|||
if (code) {
|
||||
printf("failed to init memory dbg, error:%s\n", tstrerror(code));
|
||||
} else {
|
||||
tsAsyncLog = false;
|
||||
tsAsyncLog = false;
|
||||
printf("memory dbg enabled\n");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2034,6 +2034,12 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
|||
rpcInit.compressSize = tsCompressMsgSize;
|
||||
rpcInit.user = "_dnd";
|
||||
|
||||
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
|
||||
connLimitNum = TMAX(connLimitNum, 10);
|
||||
connLimitNum = TMIN(connLimitNum, 500);
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
clientRpc = rpcOpen(&rpcInit);
|
||||
if (clientRpc == NULL) {
|
||||
tscError("failed to init server status client");
|
||||
|
|
|
@ -56,6 +56,7 @@ jmethodID g_createConsumerErrorCallback;
|
|||
jmethodID g_topicListCallback;
|
||||
|
||||
jclass g_consumerClass;
|
||||
// deprecated
|
||||
jmethodID g_commitCallback;
|
||||
|
||||
void jniGetGlobalMethod(JNIEnv *env) {
|
||||
|
|
|
@ -17,6 +17,36 @@
|
|||
#include "jniCommon.h"
|
||||
#include "taos.h"
|
||||
|
||||
int __init_tmq = 0;
|
||||
jmethodID g_offsetCallback;
|
||||
|
||||
void tmqGlobalMethod(JNIEnv *env) {
|
||||
// make sure init function executed once
|
||||
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
|
||||
case 0:
|
||||
break;
|
||||
case 1:
|
||||
do {
|
||||
taosMsleep(0);
|
||||
} while (atomic_load_32(&__init_tmq) == 1);
|
||||
case 2:
|
||||
return;
|
||||
}
|
||||
|
||||
if (g_vm == NULL) {
|
||||
(*env)->GetJavaVM(env, &g_vm);
|
||||
}
|
||||
|
||||
jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback");
|
||||
jclass g_offsetCallbackClass = (*env)->NewGlobalRef(env, offset);
|
||||
g_offsetCallback = (*env)->GetMethodID(env, g_offsetCallbackClass, "commitCallbackHandler", "(I)V");
|
||||
(*env)->DeleteLocalRef(env, offset);
|
||||
|
||||
atomic_store_32(&__init_tmq, 2);
|
||||
jniDebug("tmq method register finished");
|
||||
}
|
||||
|
||||
// deprecated
|
||||
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
||||
JNIEnv *env = NULL;
|
||||
int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6);
|
||||
|
@ -40,6 +70,28 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) {
|
|||
env = NULL;
|
||||
}
|
||||
|
||||
void consumer_callback(tmq_t *tmq, int32_t code, void *param) {
|
||||
JNIEnv *env = NULL;
|
||||
int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6);
|
||||
bool needDetach = false;
|
||||
if (status < 0) {
|
||||
if ((*g_vm)->AttachCurrentThread(g_vm, (void **)&env, NULL) != 0) {
|
||||
return;
|
||||
}
|
||||
needDetach = true;
|
||||
}
|
||||
|
||||
jobject obj = (jobject)param;
|
||||
(*env)->CallVoidMethod(env, obj, g_offsetCallback, code);
|
||||
(*env)->DeleteGlobalRef(env, obj);
|
||||
param = NULL;
|
||||
|
||||
if (needDetach) {
|
||||
(*g_vm)->DetachCurrentThread(g_vm);
|
||||
}
|
||||
env = NULL;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConfNewImp(JNIEnv *env, jobject jobj) {
|
||||
tmq_conf_t *conf = tmq_conf_new();
|
||||
jniGetGlobalMethod(env);
|
||||
|
@ -201,6 +253,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
|
|||
return tmq_commit_sync(tmq, res);
|
||||
}
|
||||
|
||||
// deprecated
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jlong jres, jobject consumer) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
@ -213,6 +266,19 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
|
|||
tmq_commit_async(tmq, res, commit_cb, consumer);
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
|
||||
jlong jres, jobject offset) {
|
||||
tmqGlobalMethod(env);
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
if (tmq == NULL) {
|
||||
jniError("jobj:%p, tmq is closed", jobj);
|
||||
return;
|
||||
}
|
||||
TAOS_RES *res = (TAOS_RES *)jres;
|
||||
offset = (*env)->NewGlobalRef(env, offset);
|
||||
tmq_commit_async(tmq, res, consumer_callback, offset);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
|
||||
jlong jtmq) {
|
||||
tmq_t *tmq = (tmq_t *)jtmq;
|
||||
|
|
|
@ -41,7 +41,8 @@ bool tsPrintAuth = false;
|
|||
|
||||
// queue & threads
|
||||
int32_t tsNumOfRpcThreads = 1;
|
||||
int32_t tsNumOfRpcSessions = 2000;
|
||||
int32_t tsNumOfRpcSessions = 5000;
|
||||
int32_t tsTimeToGetAvailableConn = 100000;
|
||||
int32_t tsNumOfCommitThreads = 2;
|
||||
int32_t tsNumOfTaskQueueThreads = 4;
|
||||
int32_t tsNumOfMnodeQueryThreads = 4;
|
||||
|
@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
|
||||
|
||||
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000);
|
||||
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
|
||||
|
||||
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
|
||||
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1;
|
||||
|
||||
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
|
||||
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
|
||||
if (tsNumOfTaskQueueThreads >= 10) {
|
||||
|
@ -397,6 +404,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
|
||||
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
|
||||
|
||||
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
|
||||
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1;
|
||||
|
||||
tsNumOfCommitThreads = tsNumOfCores / 2;
|
||||
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
@ -517,6 +527,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
|||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsTimeToGetAvailableConn = 1000;
|
||||
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
|
||||
pItem->i32 = tsTimeToGetAvailableConn;
|
||||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsNumOfCommitThreads = numOfCores / 2;
|
||||
|
@ -698,6 +716,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
|
||||
|
||||
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||
|
||||
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
|
||||
|
||||
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -735,6 +757,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
|
||||
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
||||
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
|
||||
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
||||
|
||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||
|
@ -742,7 +766,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32;
|
||||
tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||
|
|
|
@ -145,19 +145,6 @@ void taosVariantDestroy(SVariant *pVar) {
|
|||
pVar->nLen = 0;
|
||||
}
|
||||
|
||||
// NOTE: this is only for string array
|
||||
if (pVar->nType == TSDB_DATA_TYPE_POINTER_ARRAY) {
|
||||
size_t num = taosArrayGetSize(pVar->arr);
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
void *p = taosArrayGetP(pVar->arr, i);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
taosArrayDestroy(pVar->arr);
|
||||
pVar->arr = NULL;
|
||||
} else if (pVar->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
|
||||
taosArrayDestroy(pVar->arr);
|
||||
pVar->arr = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) {
|
||||
|
@ -180,28 +167,8 @@ void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) {
|
|||
|
||||
if (IS_NUMERIC_TYPE(pSrc->nType) || (pSrc->nType == TSDB_DATA_TYPE_BOOL)) {
|
||||
pDst->i = pSrc->i;
|
||||
} else if (pSrc->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { // this is only for string array
|
||||
size_t num = taosArrayGetSize(pSrc->arr);
|
||||
pDst->arr = taosArrayInit(num, sizeof(char *));
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
char *p = (char *)taosArrayGetP(pSrc->arr, i);
|
||||
char *n = taosStrdup(p);
|
||||
taosArrayPush(pDst->arr, &n);
|
||||
}
|
||||
} else if (pSrc->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
|
||||
size_t num = taosArrayGetSize(pSrc->arr);
|
||||
pDst->arr = taosArrayInit(num, sizeof(int64_t));
|
||||
pDst->nLen = pSrc->nLen;
|
||||
ASSERT(pSrc->nLen == num);
|
||||
for (size_t i = 0; i < num; i++) {
|
||||
int64_t *p = taosArrayGet(pSrc->arr, i);
|
||||
taosArrayPush(pDst->arr, p);
|
||||
}
|
||||
}
|
||||
|
||||
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) {
|
||||
pDst->nLen = tDataTypes[pDst->nType].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t taosVariantCompare(const SVariant *p1, const SVariant *p2) {
|
||||
|
|
|
@ -93,15 +93,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
pDnode is null, TD-22618
|
||||
at trans.c line 91
|
||||
before this line, dmProcessRpcMsg callback is set
|
||||
after this line, parent is set
|
||||
so when dmProcessRpcMsg is called, pDonde is still null.
|
||||
*/
|
||||
if (pDnode != NULL){
|
||||
if(pDnode->status != DND_STAT_RUNNING) {
|
||||
/*
|
||||
pDnode is null, TD-22618
|
||||
at trans.c line 91
|
||||
before this line, dmProcessRpcMsg callback is set
|
||||
after this line, parent is set
|
||||
so when dmProcessRpcMsg is called, pDonde is still null.
|
||||
*/
|
||||
if (pDnode != NULL) {
|
||||
if (pDnode->status != DND_STAT_RUNNING) {
|
||||
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
||||
dmProcessServerStartupStatus(pDnode, pRpc);
|
||||
return;
|
||||
|
@ -113,7 +113,7 @@ so when dmProcessRpcMsg is called, pDonde is still null.
|
|||
}
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||
goto _OVER;
|
||||
|
@ -304,6 +304,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.connLimitLock = 1;
|
||||
rpcInit.supportBatch = 1;
|
||||
rpcInit.batchSize = 8 * 1024;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->clientRpc == NULL) {
|
||||
|
|
|
@ -53,12 +53,12 @@ extern "C" {
|
|||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -41,12 +41,12 @@ extern "C" {
|
|||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}
|
||||
#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}}
|
||||
// clang-format on
|
||||
|
||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
@ -80,7 +80,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
TdThreadMutex lock;
|
||||
char email[TSDB_FQDN_LEN];
|
||||
char email[TSDB_FQDN_LEN];
|
||||
} STelemMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1921,10 +1921,10 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
|
|||
// refactor
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE);
|
||||
memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
|
||||
if (pTagData != NULL) memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
|
||||
*(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid;
|
||||
} else {
|
||||
memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
||||
if (pTagData != NULL) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
|
||||
*(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid;
|
||||
}
|
||||
|
||||
|
|
|
@ -155,6 +155,7 @@ typedef struct SBlockInfoBuf {
|
|||
int32_t currentIndex;
|
||||
SArray* pData;
|
||||
int32_t numPerBucket;
|
||||
int32_t numOfTables;
|
||||
} SBlockInfoBuf;
|
||||
|
||||
struct STsdbReader {
|
||||
|
@ -302,6 +303,47 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
|||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
pBuf->numOfTables = numOfTables;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||
if (numOfTables <= pBuf->numOfTables) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pBuf->numOfTables > 0) {
|
||||
STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
|
||||
taosMemoryFree(*p);
|
||||
pBuf->numOfTables /= pBuf->numPerBucket;
|
||||
}
|
||||
|
||||
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
|
||||
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
|
||||
if (pBuf->pData == NULL) {
|
||||
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
if (remainder > 0) {
|
||||
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
pBuf->numOfTables = numOfTables;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3876,8 +3918,13 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
|||
clearBlockScanInfo(*p);
|
||||
}
|
||||
|
||||
// todo handle the case where size is less than the value of num
|
||||
ASSERT(size >= num);
|
||||
if (size < num) {
|
||||
int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
||||
}
|
||||
|
||||
taosHashClear(pReader->status.pTableMap);
|
||||
STableUidList* pUidList = &pReader->status.uidList;
|
||||
|
|
|
@ -672,7 +672,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
}
|
||||
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
||||
if (limitReached) {
|
||||
// if limit is reached within a group, do not clear limiInfo otherwise the next block
|
||||
// will be processed.
|
||||
if (newgroup && limitReached) {
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -606,9 +606,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
}
|
||||
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
|
||||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
|
||||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
|
||||
code == TSDB_CODE_APP_IS_STOPPING) {
|
||||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
|
||||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
|
||||
msgType == TDMT_SCH_MERGE_FETCH) {
|
||||
return false;
|
||||
|
@ -673,6 +672,12 @@ int32_t udfdOpenClientRpc() {
|
|||
rpcInit.rfp = udfdRpcRfp;
|
||||
rpcInit.compressSize = tsCompressMsgSize;
|
||||
|
||||
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
|
||||
connLimitNum = TMAX(connLimitNum, 10);
|
||||
connLimitNum = TMIN(connLimitNum, 500);
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
global.clientRpc = rpcOpen(&rpcInit);
|
||||
if (global.clientRpc == NULL) {
|
||||
fnError("failed to init dnode rpc client");
|
||||
|
@ -765,7 +770,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
|||
}
|
||||
|
||||
void udfdHandleRequest(SUdfdUvConn *conn) {
|
||||
char *inputBuf = conn->inputBuf;
|
||||
char *inputBuf = conn->inputBuf;
|
||||
int32_t inputLen = conn->inputLen;
|
||||
|
||||
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
|
||||
|
@ -784,7 +789,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
|
|||
|
||||
void udfdPipeCloseCb(uv_handle_t *pipe) {
|
||||
SUdfdUvConn *conn = pipe->data;
|
||||
SUvUdfWork* pWork = conn->pWorkList;
|
||||
SUvUdfWork *pWork = conn->pWorkList;
|
||||
while (pWork != NULL) {
|
||||
pWork->conn = NULL;
|
||||
pWork = pWork->pWorkNext;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "indexInt.h"
|
||||
#include "indexUtil.h"
|
||||
#include "os.h"
|
||||
#include "osDef.h"
|
||||
#include "tutil.h"
|
||||
|
||||
static int32_t kBlockSize = 4096;
|
||||
|
@ -172,7 +173,8 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
|
|||
int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
|
||||
ctx->file.wBufOffset = 0;
|
||||
}
|
||||
taosFsyncFile(ctx->file.pFile);
|
||||
int ret = taosFsyncFile(ctx->file.pFile);
|
||||
UNUSED(ret);
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
|
@ -180,11 +182,11 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
|
|||
}
|
||||
|
||||
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
||||
int code = 0;
|
||||
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
|
||||
if (ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ctx->type = type;
|
||||
if (ctx->type == TFILE) {
|
||||
// ugly code, refactor later
|
||||
|
@ -192,15 +194,21 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
memcpy(ctx->file.buf, path, strlen(path));
|
||||
if (readOnly == false) {
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
taosFtruncateFile(ctx->file.pFile, 0);
|
||||
taosStatFile(path, &ctx->file.size, NULL);
|
||||
|
||||
code = taosFtruncateFile(ctx->file.pFile, 0);
|
||||
UNUSED(code);
|
||||
|
||||
code = taosStatFile(path, &ctx->file.size, NULL);
|
||||
UNUSED(code);
|
||||
|
||||
ctx->file.wBufOffset = 0;
|
||||
ctx->file.wBufCap = kBlockSize * 4;
|
||||
ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap);
|
||||
} else {
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||
taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
||||
code = taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
||||
UNUSED(code);
|
||||
|
||||
ctx->file.wBufOffset = 0;
|
||||
|
||||
#ifdef USE_MMAP
|
||||
|
|
|
@ -3342,6 +3342,9 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pSelect->timeRange.skey > pSelect->timeRange.ekey) {
|
||||
pSelect->isEmptyResult = true;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1631,12 +1631,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
|||
|
||||
SValueNode *var = (SValueNode *)field->desc;
|
||||
SDataType *dType = &var->node.resType;
|
||||
// if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {
|
||||
// qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data,
|
||||
// *(((int64_t *)field->data) + 1));
|
||||
// } else {
|
||||
qDebug("VAL%d => [type:%d][val:%" PRIx64 "]", i, dType->type, var->datum.i); // TODO
|
||||
//}
|
||||
} else if (field->data) {
|
||||
qDebug("VAL%d => [type:NIL][val:NIL]", i); // TODO
|
||||
}
|
||||
|
@ -1956,18 +1951,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
|||
|
||||
fi->data = taosMemoryCalloc(1, bytes);
|
||||
} else {
|
||||
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { // TIME RANGE
|
||||
/*
|
||||
fi->data = taosMemoryCalloc(dType->bytes, tDataTypes[type].bytes);
|
||||
for (int32_t a = 0; a < dType->bytes; ++a) {
|
||||
int64_t *v = taosArrayGet(var->arr, a);
|
||||
assignVal((char *)fi->data + a * tDataTypes[type].bytes, (char *)v, 0, type);
|
||||
}
|
||||
*/
|
||||
continue;
|
||||
} else {
|
||||
fi->data = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
}
|
||||
fi->data = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (dType->type == type) {
|
||||
|
|
|
@ -148,7 +148,8 @@ typedef struct {
|
|||
int8_t epsetRetryCnt;
|
||||
int32_t retryCode;
|
||||
|
||||
int hThrdIdx;
|
||||
void* task;
|
||||
int hThrdIdx;
|
||||
} STransConnCtx;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
|
|
@ -64,11 +64,11 @@ typedef struct {
|
|||
void (*destroyFp)(void* ahandle);
|
||||
bool (*failFastFp)(tmsg_t msgType);
|
||||
|
||||
int32_t connLimitNum;
|
||||
int8_t connLimitLock; // 0: no lock. 1. lock
|
||||
int8_t supportBatch; // 0: no batch, 1: support batch
|
||||
int32_t batchSize;
|
||||
|
||||
int32_t connLimitNum;
|
||||
int8_t connLimitLock; // 0: no lock. 1. lock
|
||||
int8_t supportBatch; // 0: no batch, 1: support batch
|
||||
int32_t batchSize;
|
||||
int32_t timeToGetConn;
|
||||
int index;
|
||||
void* parent;
|
||||
void* tcphandle; // returned handle from TCP initialization
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
|
||||
#define HTTP_RECV_BUF_SIZE 1024
|
||||
|
||||
static int32_t httpRefMgt = 0;
|
||||
static int64_t httpRef = -1;
|
||||
typedef struct SHttpModule {
|
||||
uv_loop_t* loop;
|
||||
SAsyncPool* asyncPool;
|
||||
|
@ -41,7 +43,6 @@ typedef struct SHttpMsg {
|
|||
int32_t len;
|
||||
EHttpCompFlag flag;
|
||||
int8_t quit;
|
||||
SHttpModule* http;
|
||||
|
||||
} SHttpMsg;
|
||||
|
||||
|
@ -57,7 +58,6 @@ typedef struct SHttpClient {
|
|||
} SHttpClient;
|
||||
|
||||
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
||||
static SHttpModule* thttp = NULL;
|
||||
static void transHttpEnvInit();
|
||||
|
||||
static void httpHandleReq(SHttpMsg* msg);
|
||||
|
@ -280,26 +280,29 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
|||
}
|
||||
|
||||
int32_t httpSendQuit() {
|
||||
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||
if (http == NULL) return 0;
|
||||
|
||||
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
||||
msg->quit = 1;
|
||||
|
||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||
if (load == NULL) {
|
||||
httpDestroyMsg(msg);
|
||||
tError("http-report already released");
|
||||
return -1;
|
||||
} else {
|
||||
msg->http = load;
|
||||
}
|
||||
transAsyncSend(load->asyncPool, &(msg->q));
|
||||
transAsyncSend(http->asyncPool, &(msg->q));
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||
EHttpCompFlag flag) {
|
||||
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||
if (load == NULL) {
|
||||
tError("http-report already released");
|
||||
return -1;
|
||||
}
|
||||
|
||||
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
||||
|
||||
msg->server = taosStrdup(server);
|
||||
msg->uri = taosStrdup(uri);
|
||||
msg->uri = taosStrdup(uri);
|
||||
msg->port = port;
|
||||
msg->cont = taosMemoryMalloc(contLen);
|
||||
memcpy(msg->cont, pCont, contLen);
|
||||
|
@ -307,15 +310,9 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1
|
|||
msg->flag = flag;
|
||||
msg->quit = 0;
|
||||
|
||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||
if (load == NULL) {
|
||||
httpDestroyMsg(msg);
|
||||
tError("http-report already released");
|
||||
return -1;
|
||||
}
|
||||
|
||||
msg->http = load;
|
||||
return transAsyncSend(load->asyncPool, &(msg->q));
|
||||
int ret = transAsyncSend(load->asyncPool, &(msg->q));
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void httpDestroyClientCb(uv_handle_t* handle) {
|
||||
|
@ -335,13 +332,19 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
|
|||
return;
|
||||
}
|
||||
static void httpHandleQuit(SHttpMsg* msg) {
|
||||
SHttpModule* http = msg->http;
|
||||
taosMemoryFree(msg);
|
||||
|
||||
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||
if (http == NULL) return;
|
||||
|
||||
uv_walk(http->loop, httpWalkCb, NULL);
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
}
|
||||
static void httpHandleReq(SHttpMsg* msg) {
|
||||
SHttpModule* http = msg->http;
|
||||
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
|
||||
if (http == NULL) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
struct sockaddr_in dest = {0};
|
||||
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
|
||||
|
@ -391,6 +394,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
|||
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
||||
if (ret != 0) {
|
||||
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
destroyHttpClient(cli);
|
||||
return;
|
||||
}
|
||||
|
@ -401,21 +405,26 @@ static void httpHandleReq(SHttpMsg* msg) {
|
|||
cli->port);
|
||||
destroyHttpClient(cli);
|
||||
}
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
return;
|
||||
|
||||
END:
|
||||
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
|
||||
httpDestroyMsg(msg);
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
}
|
||||
|
||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||
EHttpCompFlag flag) {
|
||||
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
||||
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
|
||||
}
|
||||
|
||||
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
||||
static void transHttpEnvInit() {
|
||||
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
||||
|
||||
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||
uv_loop_init(http->loop);
|
||||
|
||||
|
@ -426,21 +435,22 @@ static void transHttpEnvInit() {
|
|||
http = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
||||
if (err != 0) {
|
||||
taosMemoryFree(http->loop);
|
||||
taosMemoryFree(http);
|
||||
http = NULL;
|
||||
}
|
||||
atomic_store_ptr(&thttp, http);
|
||||
httpRef = taosAddRef(httpRefMgt, http);
|
||||
}
|
||||
|
||||
void transHttpEnvDestroy() {
|
||||
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||
if (load == NULL) {
|
||||
// remove http
|
||||
if (httpRef == -1) {
|
||||
return;
|
||||
}
|
||||
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||
httpSendQuit();
|
||||
taosThreadJoin(load->thread, NULL);
|
||||
|
||||
|
@ -448,7 +458,7 @@ void transHttpEnvDestroy() {
|
|||
transAsyncPoolDestroy(load->asyncPool);
|
||||
uv_loop_close(load->loop);
|
||||
taosMemoryFree(load->loop);
|
||||
taosMemoryFree(load);
|
||||
|
||||
atomic_store_ptr(&thttp, NULL);
|
||||
taosReleaseRef(httpRefMgt, httpRef);
|
||||
taosRemoveRef(httpRefMgt, httpRef);
|
||||
}
|
||||
|
|
|
@ -66,6 +66,10 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->destroyFp = pInit->dfp;
|
||||
pRpc->failFastFp = pInit->ffp;
|
||||
pRpc->connLimitNum = pInit->connLimitNum;
|
||||
if (pRpc->connLimitNum == 0) {
|
||||
pRpc->connLimitNum = 20;
|
||||
}
|
||||
|
||||
pRpc->connLimitLock = pInit->connLimitLock;
|
||||
pRpc->supportBatch = pInit->supportBatch;
|
||||
pRpc->batchSize = pInit->batchSize;
|
||||
|
@ -90,7 +94,10 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
if (pInit->user) {
|
||||
tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
|
||||
}
|
||||
|
||||
pRpc->timeToGetConn = pInit->timeToGetConn;
|
||||
if (pRpc->timeToGetConn == 0) {
|
||||
pRpc->timeToGetConn = 10 * 1000;
|
||||
}
|
||||
pRpc->tcphandle =
|
||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
|
|
|
@ -11,11 +11,19 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "transComm.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfConn;
|
||||
queue msgQ;
|
||||
} SMsgList;
|
||||
|
||||
typedef struct SConnList {
|
||||
queue conns;
|
||||
int32_t size;
|
||||
queue conns;
|
||||
int32_t size;
|
||||
SMsgList* list;
|
||||
} SConnList;
|
||||
|
||||
typedef struct {
|
||||
|
@ -64,15 +72,13 @@ typedef struct SCliConn {
|
|||
|
||||
SCliBatch* pBatch;
|
||||
|
||||
int64_t refId;
|
||||
char* ip;
|
||||
|
||||
SDelayTask* task;
|
||||
|
||||
// debug and log info
|
||||
char src[32];
|
||||
char dst[32];
|
||||
char* ip;
|
||||
char src[32];
|
||||
char dst[32];
|
||||
|
||||
int64_t refId;
|
||||
} SCliConn;
|
||||
|
||||
typedef struct SCliMsg {
|
||||
|
@ -100,6 +106,7 @@ typedef struct SCliThrd {
|
|||
TdThreadMutex msgMtx;
|
||||
SDelayQueue* delayQueue;
|
||||
SDelayQueue* timeoutQueue;
|
||||
SDelayQueue* waitConnQueue;
|
||||
uint64_t nextTimeout; // next timeout
|
||||
void* pTransInst; //
|
||||
|
||||
|
@ -109,7 +116,6 @@ typedef struct SCliThrd {
|
|||
SCvtAddr cvtAddr;
|
||||
|
||||
SHashObj* failFastCache;
|
||||
SHashObj* connLimitCache;
|
||||
SHashObj* batchCache;
|
||||
|
||||
SCliMsg* stopMsg;
|
||||
|
@ -131,11 +137,12 @@ typedef struct {
|
|||
int32_t threshold;
|
||||
int64_t interval;
|
||||
} SFailFastItem;
|
||||
|
||||
// conn pool
|
||||
// add expire timeout and capacity limit
|
||||
static void* createConnPool(int size);
|
||||
static void* destroyConnPool(void* pool);
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||
static void* destroyConnPool(SCliThrd* thread);
|
||||
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
|
||||
static void addConnToPool(void* pool, SCliConn* conn);
|
||||
static void doCloseIdleConn(void* param);
|
||||
|
||||
|
@ -175,7 +182,8 @@ static void cliSend(SCliConn* pConn);
|
|||
static void cliSendBatch(SCliConn* pConn);
|
||||
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
|
||||
|
||||
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port);
|
||||
static void doFreeTimeoutMsg(void* param);
|
||||
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
|
||||
|
||||
// cli util func
|
||||
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
|
||||
|
@ -193,6 +201,7 @@ static void cliHandleExcept(SCliConn* conn);
|
|||
static void cliReleaseUnfinishedMsg(SCliConn* conn);
|
||||
static void cliHandleFastFail(SCliConn* pConn, int status);
|
||||
|
||||
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
// handle req from app
|
||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
|
@ -225,11 +234,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
|||
// snprintf may cause performance problem
|
||||
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
||||
do { \
|
||||
char* p = key; \
|
||||
int32_t len = strlen(ip); \
|
||||
if (p != NULL) memcpy(p, ip, len); \
|
||||
p[len] = ':'; \
|
||||
titoa(port, 10, &p[len + 1]); \
|
||||
char* t = key; \
|
||||
int16_t len = strlen(ip); \
|
||||
if (ip != NULL) memcpy(t, ip, len); \
|
||||
t[len] = ':'; \
|
||||
titoa(port, 10, &t[len + 1]); \
|
||||
} while (0)
|
||||
|
||||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||
|
@ -333,12 +342,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
|
|||
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
CONN_GET_NEXT_SENDMSG(conn);
|
||||
if (pCliMsg == NULL)
|
||||
return false;
|
||||
else {
|
||||
cliSend(conn);
|
||||
return true;
|
||||
}
|
||||
cliSend(conn);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
_RETURN:
|
||||
|
@ -362,6 +367,7 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
||||
if (msgLen <= 0) {
|
||||
taosMemoryFree(pHead);
|
||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||
return;
|
||||
}
|
||||
|
@ -545,7 +551,8 @@ void* createConnPool(int size) {
|
|||
// thread local, no lock
|
||||
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
}
|
||||
void* destroyConnPool(void* pool) {
|
||||
void* destroyConnPool(SCliThrd* pThrd) {
|
||||
void* pool = pThrd->pool;
|
||||
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
||||
while (connList != NULL) {
|
||||
while (!QUEUE_IS_EMPTY(&connList->conns)) {
|
||||
|
@ -553,34 +560,130 @@ void* destroyConnPool(void* pool) {
|
|||
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
|
||||
cliDestroyConn(c, true);
|
||||
}
|
||||
|
||||
SMsgList* msglist = connList->list;
|
||||
while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
|
||||
queue* h = QUEUE_HEAD(&msglist->msgQ);
|
||||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
|
||||
pMsg->ctx->task = NULL;
|
||||
|
||||
doNotifyApp(pMsg, pThrd);
|
||||
}
|
||||
taosMemoryFree(msglist);
|
||||
|
||||
connList = taosHashIterate((SHashObj*)pool, connList);
|
||||
}
|
||||
taosHashCleanup(pool);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||
void* pool = pThrd->pool;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
STrans* pTranInst = pThrd->pTransInst;
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key));
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
nList->numOfConn++;
|
||||
|
||||
QUEUE_INIT(&plist->conns);
|
||||
plist->list = nList;
|
||||
}
|
||||
|
||||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||
if (plist->list->numOfConn >= pTranInst->connLimitNum) {
|
||||
*exceed = true;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
queue* h = QUEUE_TAIL(&plist->conns);
|
||||
QUEUE_REMOVE(h);
|
||||
plist->size -= 1;
|
||||
|
||||
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
|
||||
conn->status = ConnNormal;
|
||||
QUEUE_INIT(&conn->q);
|
||||
|
||||
if (conn->task != NULL) {
|
||||
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||
conn->task = NULL;
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||
void* pool = pThrd->pool;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
if (plist == NULL) return NULL;
|
||||
plist = taosHashGet(pool, key, strlen(key));
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
nList->numOfConn++;
|
||||
|
||||
QUEUE_INIT(&plist->conns);
|
||||
plist->list = nList;
|
||||
}
|
||||
|
||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||
// no avaliable conn in pool
|
||||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||
SMsgList* list = plist->list;
|
||||
if ((list)->numOfConn >= pTransInst->connLimitNum) {
|
||||
STraceId* trace = &(*pMsg)->msg.info.traceId;
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = *pMsg;
|
||||
arg->param2 = pThrd;
|
||||
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
|
||||
|
||||
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
|
||||
|
||||
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
|
||||
*pMsg = NULL;
|
||||
} else {
|
||||
// send msg in delay queue
|
||||
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
|
||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||
arg->param1 = *pMsg;
|
||||
arg->param2 = pThrd;
|
||||
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
|
||||
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
|
||||
TMSG_INFO((*pMsg)->msg.msgType));
|
||||
|
||||
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
|
||||
queue* h = QUEUE_HEAD(&(list)->msgQ);
|
||||
QUEUE_REMOVE(h);
|
||||
SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
*pMsg = ans;
|
||||
|
||||
trace = &(*pMsg)->msg.info.traceId;
|
||||
tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
|
||||
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
|
||||
}
|
||||
list->numOfConn++;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
queue* h = QUEUE_TAIL(&plist->conns);
|
||||
plist->size -= 1;
|
||||
queue* h = QUEUE_HEAD(&plist->conns);
|
||||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
|
||||
conn->status = ConnNormal;
|
||||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
|
||||
if (conn->task != NULL) {
|
||||
|
@ -608,18 +711,34 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
|
||||
cliDestroyConnMsgs(conn, false);
|
||||
|
||||
conn->status = ConnInPool;
|
||||
|
||||
if (conn->list == NULL) {
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
|
||||
} else {
|
||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||
}
|
||||
|
||||
SConnList* pList = conn->list;
|
||||
SMsgList* msgList = pList->list;
|
||||
if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
|
||||
queue* h = QUEUE_HEAD(&(msgList)->msgQ);
|
||||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
|
||||
pMsg->ctx->task = NULL;
|
||||
|
||||
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
||||
conn->status = ConnNormal;
|
||||
cliSend(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
conn->status = ConnInPool;
|
||||
QUEUE_PUSH(&conn->list->conns, &conn->q);
|
||||
conn->list->size += 1;
|
||||
|
||||
if (conn->list->size >= 250) {
|
||||
if (conn->list->size >= 20) {
|
||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||
arg->param1 = conn;
|
||||
arg->param2 = thrd;
|
||||
|
@ -741,8 +860,20 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
|||
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||
SCliThrd* pThrd = conn->hostThrd;
|
||||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
||||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
|
||||
if (conn->list != NULL) {
|
||||
SConnList* connList = conn->list;
|
||||
connList->list->numOfConn--;
|
||||
connList->size--;
|
||||
} else {
|
||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
|
||||
connList->list->numOfConn--;
|
||||
}
|
||||
conn->list = NULL;
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
conn->refId = -1;
|
||||
|
@ -777,9 +908,6 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
conn->timer->data = NULL;
|
||||
conn->timer = NULL;
|
||||
}
|
||||
int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
|
||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1;
|
||||
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
|
||||
|
||||
atomic_sub_fetch_32(&pThrd->connCount, 1);
|
||||
|
||||
|
@ -1012,11 +1140,15 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
|||
STrans* pTransInst = pThrd->pTransInst;
|
||||
SCliBatchList* pList = pBatch->pList;
|
||||
|
||||
SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port);
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
|
||||
|
||||
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) {
|
||||
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
|
||||
pBatch->batchSize);
|
||||
bool exceed = false;
|
||||
SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
|
||||
|
||||
if (conn == NULL && exceed) {
|
||||
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
|
||||
pBatch->batchSize, pTransInst->connLimitNum);
|
||||
cliDestroyBatch(pBatch);
|
||||
return;
|
||||
}
|
||||
|
@ -1176,10 +1308,6 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip));
|
||||
int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1;
|
||||
taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
|
||||
|
||||
struct sockaddr peername, sockname;
|
||||
int addrlen = sizeof(peername);
|
||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
|
||||
|
@ -1197,6 +1325,29 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
STransMsg transMsg = {0};
|
||||
transMsg.contLen = 0;
|
||||
transMsg.pCont = NULL;
|
||||
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
|
||||
transMsg.msgType = pMsg->msg.msgType + 1;
|
||||
transMsg.info.ahandle = pMsg->ctx->ahandle;
|
||||
transMsg.info.traceId = pMsg->msg.info.traceId;
|
||||
transMsg.info.hasEpSet = false;
|
||||
if (pCtx->pSem != NULL) {
|
||||
if (pCtx->pRsp == NULL) {
|
||||
} else {
|
||||
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
|
||||
}
|
||||
} else {
|
||||
pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
|
||||
}
|
||||
|
||||
destroyCmsg(pMsg);
|
||||
}
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
|
||||
pThrd->stopMsg = pMsg;
|
||||
|
@ -1206,7 +1357,8 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
pThrd->quit = true;
|
||||
tDebug("cli work thread %p start to quit", pThrd);
|
||||
destroyCmsg(pMsg);
|
||||
destroyConnPool(pThrd->pool);
|
||||
|
||||
destroyConnPool(pThrd);
|
||||
uv_walk(pThrd->loop, cliWalkCb, NULL);
|
||||
}
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
|
@ -1239,11 +1391,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
destroyCmsg(pMsg);
|
||||
}
|
||||
|
||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
|
||||
STransConnCtx* pCtx = (*pMsg)->ctx;
|
||||
SCliConn* conn = NULL;
|
||||
|
||||
int64_t refId = (int64_t)(pMsg->msg.info.handle);
|
||||
int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
|
||||
if (refId != 0) {
|
||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh == NULL) {
|
||||
|
@ -1253,7 +1405,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
|||
} else {
|
||||
conn = exh->handle;
|
||||
if (conn == NULL) {
|
||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
||||
conn = getConnFromPool2(pThrd, addr, pMsg);
|
||||
if (conn != NULL) specifyConnRef(conn, true, refId);
|
||||
}
|
||||
transReleaseExHandle(transGetRefMgt(), refId);
|
||||
|
@ -1261,7 +1413,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
|
|||
return conn;
|
||||
};
|
||||
|
||||
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
|
||||
conn = getConnFromPool2(pThrd, addr, pMsg);
|
||||
if (conn != NULL) {
|
||||
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
|
||||
} else {
|
||||
|
@ -1319,57 +1471,34 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
|||
return;
|
||||
}
|
||||
|
||||
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
static void doFreeTimeoutMsg(void* param) {
|
||||
STaskArg* arg = param;
|
||||
SCliMsg* pMsg = arg->param1;
|
||||
SCliThrd* pThrd = arg->param2;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
// STransConnCtx* pCtx = pMsg->ctx;
|
||||
// char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
// int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
|
||||
if (val == NULL) return 0;
|
||||
|
||||
if (*val >= pTransInst->connLimitNum) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
QUEUE_REMOVE(&pMsg->q);
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
|
||||
doNotifyApp(pMsg, pThrd);
|
||||
taosMemoryFree(arg);
|
||||
}
|
||||
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
|
||||
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
|
||||
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
|
||||
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
|
||||
if (item != NULL) {
|
||||
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
|
||||
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
|
||||
tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label,
|
||||
TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse);
|
||||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
|
||||
char addr[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
|
||||
|
||||
bool ignore = false;
|
||||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
||||
SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
|
||||
if (ignore == true) {
|
||||
// persist conn already release by server
|
||||
STransMsg resp;
|
||||
|
@ -1380,16 +1509,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
destroyCmsg(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) {
|
||||
tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
|
||||
tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
|
||||
destroyCmsg(pMsg);
|
||||
if (conn == NULL && pMsg == NULL) {
|
||||
return;
|
||||
}
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
|
||||
if (conn != NULL) {
|
||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
cliSend(conn);
|
||||
} else {
|
||||
|
@ -1398,15 +1524,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
int64_t refId = (int64_t)pMsg->msg.info.handle;
|
||||
if (refId != 0) specifyConnRef(conn, true, refId);
|
||||
|
||||
transCtxMerge(&conn->ctx, &pCtx->appCtx);
|
||||
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
CONN_CONSTRUCT_HASH_KEY(key, fqdn, port);
|
||||
|
||||
conn->ip = taosStrdup(key);
|
||||
conn->ip = taosStrdup(addr);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
|
||||
if (ipaddr == 0xffffffff) {
|
||||
|
@ -1730,17 +1851,23 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||
SCliThrd* pThrd = createThrdObj(shandle);
|
||||
if (pThrd == NULL) {
|
||||
return NULL;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
|
||||
if (err == 0) {
|
||||
if (err != 0) {
|
||||
goto _err;
|
||||
} else {
|
||||
tDebug("success to create tranport-cli thread:%d", i);
|
||||
}
|
||||
cli->pThreadObj[i] = pThrd;
|
||||
}
|
||||
|
||||
return cli;
|
||||
|
||||
_err:
|
||||
taosMemoryFree(cli->pThreadObj);
|
||||
taosMemoryFree(cli);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
||||
|
@ -1830,14 +1957,14 @@ static SCliThrd* createThrdObj(void* trans) {
|
|||
|
||||
transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
|
||||
|
||||
transDQCreate(pThrd->loop, &pThrd->waitConnQueue);
|
||||
|
||||
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
||||
pThrd->pTransInst = trans;
|
||||
|
||||
pThrd->destroyAhandleFp = pTransInst->destroyFp;
|
||||
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
|
||||
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
|
||||
|
||||
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
||||
|
@ -1857,6 +1984,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
|
||||
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
|
||||
transDQDestroy(pThrd->timeoutQueue, NULL);
|
||||
transDQDestroy(pThrd->waitConnQueue, NULL);
|
||||
|
||||
tDebug("thread destroy %" PRId64, pThrd->pid);
|
||||
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
|
||||
|
@ -1868,7 +1996,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
taosMemoryFree(pThrd->loop);
|
||||
taosHashCleanup(pThrd->fqdn2ipCache);
|
||||
taosHashCleanup(pThrd->failFastCache);
|
||||
taosHashCleanup(pThrd->connLimitCache);
|
||||
|
||||
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
|
||||
while (pIter != NULL) {
|
||||
|
|
|
@ -468,7 +468,7 @@ size_t tstrncspn(const char *str, size_t size, const char *reject, size_t rsize)
|
|||
c3 = p[s[j + 3]];
|
||||
|
||||
if ((c0 | c1 | c2 | c3) != 0) {
|
||||
size_t count = ((i + 1) >> 2);
|
||||
size_t count = i * 4;
|
||||
return (c0 | c1) != 0 ? count - c0 + 1 : count - c2 + 3;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -294,6 +294,10 @@ TEST(utilTest, tstrncspn) {
|
|||
const char* reject5 = "911";
|
||||
v = tstrncspn(p2, strlen(p2), reject5, 0);
|
||||
ASSERT_EQ(v, 14);
|
||||
|
||||
const char* reject6 = "Kk";
|
||||
v = tstrncspn(p2, strlen(p2), reject6, 2);
|
||||
ASSERT_EQ(v, 10);
|
||||
}
|
||||
|
||||
TEST(utilTest, intToHextStr) {
|
||||
|
@ -322,4 +326,4 @@ TEST(utilTest, intToHextStr) {
|
|||
sprintf(destBuf, "%" PRIx64, v);
|
||||
ASSERT_STREQ(buf, destBuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,6 +168,7 @@
|
|||
,,y,script,./test.sh -f tsim/parser/union.sim
|
||||
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
|
||||
,,y,script,./test.sh -f tsim/parser/where.sim
|
||||
,,y,script,./test.sh -f tsim/query/tagLikeFilter.sim
|
||||
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
|
||||
,,y,script,./test.sh -f tsim/query/explain.sim
|
||||
,,y,script,./test.sh -f tsim/query/interval-offset.sim
|
||||
|
@ -181,7 +182,8 @@
|
|||
,,y,script,./test.sh -f tsim/query/groupby.sim
|
||||
,,y,script,./test.sh -f tsim/query/event.sim
|
||||
,,y,script,./test.sh -f tsim/query/forceFill.sim
|
||||
,,n,script,./test.sh -f tsim/query/join.sim
|
||||
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
|
||||
,,y,script,./test.sh -f tsim/query/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/mnode/basic1.sim
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
sql drop database if exists db1;
|
||||
sql create database if not exists db1;
|
||||
sql use db1;
|
||||
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int);
|
||||
sql create table tba1 using sta tags(1);
|
||||
sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a");
|
||||
sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b");
|
||||
sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b");
|
||||
sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b");
|
||||
sql select last_row(*) from sta where ts >= 1678901803783 and ts <= 1678901803783 and _c0 <= 1678901803782 interval(10d,8d) fill(linear) order by _wstart desc;
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,39 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
$dbPrefix = db
|
||||
$tbPrefix1 = tba
|
||||
$tbPrefix2 = tbb
|
||||
$mtPrefix = stb
|
||||
$tbNum = 10
|
||||
$rowNum = 2
|
||||
|
||||
print =============== step1
|
||||
$i = 0
|
||||
$db = $dbPrefix . $i
|
||||
$mt1 = $mtPrefix . $i
|
||||
$i = 1
|
||||
$mt2 = $mtPrefix . $i
|
||||
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db vgroups 3
|
||||
sql use $db
|
||||
sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
|
||||
sql create table tb0 using $mt1 tags(0, 'a');
|
||||
sql create table tb1 using $mt1 tags(1, 'b');
|
||||
sql create table tb2 using $mt1 tags(1, 'a');
|
||||
sql create table tb3 using $mt1 tags(1, 'a');
|
||||
sql create table tb4 using $mt1 tags(3, 'b');
|
||||
sql create table tb5 using $mt1 tags(3, 'a');
|
||||
sql create table tb6 using $mt1 tags(3, 'b');
|
||||
sql create table tb7 using $mt1 tags(3, 'b');
|
||||
|
||||
sql select * from $mt1 partition by tag1,tag2 limit 1;
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,36 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
sql drop database if exists db1;
|
||||
sql create database if not exists db1 vgroups 10;
|
||||
sql use db1;
|
||||
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 binary(100));
|
||||
sql create table tba1 using sta tags('ZQMPvstuzZVzCRjFTQawILuGSqZKSqlJwcBtZMxrAEqBbzChHWVDMiAZJwESzJAf');
|
||||
sql create table tba2 using sta tags('ieofwehughkreghughuerugu34jf9340aieefjalie28ffj8fj8fafjaekdfjfii');
|
||||
sql create table tba3 using sta tags('ZQMPvstuzZVzCRjFTQawILuGSqabSqlJwcBtZMxrAEqBbzChHWVDMiAZJwESzJAf');
|
||||
sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a");
|
||||
sql insert into tba2 values ('2022-04-26 15:15:01', 1.0, "a");
|
||||
sql insert into tba2 values ('2022-04-26 15:15:02', 1.0, "a");
|
||||
sql insert into tba3 values ('2022-04-26 15:15:01', 1.0, "a");
|
||||
sql insert into tba3 values ('2022-04-26 15:15:02', 1.0, "a");
|
||||
sql insert into tba3 values ('2022-04-26 15:15:03', 1.0, "a");
|
||||
sql select t1 from sta where t1 like '%ab%';
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
sql select t1 from sta where t1 like '%ax%';
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select t1 from sta where t1 like '%cd%';
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select t1 from sta where t1 like '%ii';
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
|||
def init(self, conn, logSql, replicaVar):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
@ -41,9 +41,9 @@ class TDTestCase:
|
|||
buildPath = root[:len(root)-len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
|
||||
def run_benchmark(self,dbname,tables,per_table_num,order,replica):
|
||||
#O :Out of order
|
||||
#O :Out of order
|
||||
#A :Repliaca
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
|
@ -53,135 +53,147 @@ class TDTestCase:
|
|||
binPath = buildPath+ "/build/bin/"
|
||||
|
||||
os.system("%staosBenchmark -d %s -t %d -n %d -O %d -a %d -b float,double,nchar\(200\),binary\(50\) -T 50 -y " % (binPath,dbname,tables,per_table_num,order,replica))
|
||||
|
||||
|
||||
def sql_base(self,dbname):
|
||||
self.check_sub(dbname)
|
||||
sql1 = "select count(*) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql1)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(ts) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(_c0) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c0) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c1) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c2) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c3) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(t0) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(t1) from %s.meters" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(ts) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(_c0) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c0) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c1) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c2) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(c3) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(t0) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
self.check_sub(dbname)
|
||||
sql2 = "select count(t1) from (select * from %s.meters)" %dbname
|
||||
self.sql_base_check(sql1,sql2)
|
||||
|
||||
|
||||
|
||||
# TD-22520
|
||||
tdSql.query("select tbname, ts from %s.meters where ts < '2017-07-14 10:40:00' order by ts asc limit 150;" %dbname)
|
||||
tdSql.checkRows(150)
|
||||
|
||||
tdSql.query("select tbname, ts from %s.meters where ts < '2017-07-14 10:40:00' order by ts asc limit 300;" %dbname)
|
||||
tdSql.checkRows(300)
|
||||
|
||||
tdSql.query("select tbname, ts from %s.meters where ts < '2017-07-14 10:40:00' order by ts desc limit 150;" %dbname)
|
||||
tdSql.checkRows(150)
|
||||
|
||||
tdSql.query("select tbname, ts from %s.meters where ts < '2017-07-14 10:40:00' order by ts desc limit 300;" %dbname)
|
||||
tdSql.checkRows(300)
|
||||
|
||||
def sql_base_check(self,sql1,sql2):
|
||||
tdSql.query(sql1)
|
||||
sql1_result = tdSql.getData(0,0)
|
||||
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
|
||||
|
||||
|
||||
tdSql.query(sql2)
|
||||
sql2_result = tdSql.getData(0,0)
|
||||
tdLog.info("sql:%s , result: %s" %(sql2,sql2_result))
|
||||
|
||||
|
||||
if sql1_result==sql2_result:
|
||||
tdLog.info(f"checkEqual success, sql1_result={sql1_result},sql2_result={sql2_result}")
|
||||
tdLog.info(f"checkEqual success, sql1_result={sql1_result},sql2_result={sql2_result}")
|
||||
else :
|
||||
tdLog.exit(f"checkEqual error, sql1_result=={sql1_result},sql2_result={sql2_result}")
|
||||
|
||||
tdLog.exit(f"checkEqual error, sql1_result=={sql1_result},sql2_result={sql2_result}")
|
||||
|
||||
def run_sql(self,dbname):
|
||||
self.sql_base(dbname)
|
||||
|
||||
|
||||
tdSql.execute(" flush database %s;" %dbname)
|
||||
|
||||
|
||||
self.sql_base(dbname)
|
||||
|
||||
|
||||
def check_sub(self,dbname):
|
||||
|
||||
|
||||
sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname
|
||||
tdSql.query(sql)
|
||||
num = tdSql.getData(0,0)
|
||||
|
||||
|
||||
for i in range(0,num):
|
||||
sql1 = "select count(*) from %s.d%d" %(dbname,i)
|
||||
tdSql.query(sql1)
|
||||
sql1_result = tdSql.getData(0,0)
|
||||
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
|
||||
|
||||
|
||||
def check_out_of_order(self,dbname,tables,per_table_num,order,replica):
|
||||
self.run_benchmark(dbname,tables,per_table_num,order,replica)
|
||||
print("sleep 10 seconds")
|
||||
#time.sleep(10)
|
||||
print("sleep 10 seconds finish")
|
||||
|
||||
|
||||
self.run_sql(dbname)
|
||||
|
||||
|
||||
def run(self):
|
||||
startTime = time.time()
|
||||
|
||||
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
|
||||
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1)
|
||||
|
||||
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)
|
||||
|
||||
# self.check_out_of_order('db3',random.randint(50,200),random.randint(10000,50000),random.randint(50,100),1)
|
||||
|
||||
# self.check_out_of_order('db4',random.randint(50,200),random.randint(10000,50000),100,1)
|
||||
|
||||
startTime = time.time()
|
||||
|
||||
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
|
||||
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1)
|
||||
|
||||
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)
|
||||
|
||||
# self.check_out_of_order('db3',random.randint(50,200),random.randint(10000,50000),random.randint(50,100),1)
|
||||
|
||||
# self.check_out_of_order('db4',random.randint(50,200),random.randint(10000,50000),100,1)
|
||||
|
||||
endTime = time.time()
|
||||
print("total time %ds" % (endTime - startTime))
|
||||
|
||||
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
|
|
@ -21,7 +21,7 @@ static void shellWorkAsClient() {
|
|||
SRpcInit rpcInit = {0};
|
||||
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
|
||||
SRpcMsg rpcRsp = {0};
|
||||
void * clientRpc = NULL;
|
||||
void *clientRpc = NULL;
|
||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
|
||||
taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass);
|
||||
|
@ -31,6 +31,7 @@ static void shellWorkAsClient() {
|
|||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = "_dnd";
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
clientRpc = rpcOpen(&rpcInit);
|
||||
if (clientRpc == NULL) {
|
||||
|
|
Loading…
Reference in New Issue