Hotfix/sangshuduo/td 4001 for develop (#5965)
* [TD-4001]<fix>: taosdemo restful segfault. due to gethostbyname() is not thread-safe. * fix uninitialized variable when invalid mode. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
7dbf75ae66
commit
a080f2c917
|
@ -241,7 +241,7 @@ typedef struct SSuperTable_S {
|
||||||
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||||
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
||||||
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
|
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
|
||||||
int64_t childTblLimit;
|
int64_t childTblLimit;
|
||||||
int64_t childTblOffset;
|
int64_t childTblOffset;
|
||||||
|
|
||||||
|
@ -334,6 +334,8 @@ typedef struct SDataBase_S {
|
||||||
typedef struct SDbs_S {
|
typedef struct SDbs_S {
|
||||||
char cfgDir[MAX_FILE_NAME_LEN+1];
|
char cfgDir[MAX_FILE_NAME_LEN+1];
|
||||||
char host[MAX_HOSTNAME_SIZE];
|
char host[MAX_HOSTNAME_SIZE];
|
||||||
|
struct sockaddr_in serv_addr;
|
||||||
|
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
char user[MAX_USERNAME_SIZE];
|
char user[MAX_USERNAME_SIZE];
|
||||||
char password[MAX_PASSWORD_SIZE];
|
char password[MAX_PASSWORD_SIZE];
|
||||||
|
@ -393,10 +395,11 @@ typedef struct SQueryMetaInfo_S {
|
||||||
char cfgDir[MAX_FILE_NAME_LEN+1];
|
char cfgDir[MAX_FILE_NAME_LEN+1];
|
||||||
char host[MAX_HOSTNAME_SIZE];
|
char host[MAX_HOSTNAME_SIZE];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
|
struct sockaddr_in serv_addr;
|
||||||
char user[MAX_USERNAME_SIZE];
|
char user[MAX_USERNAME_SIZE];
|
||||||
char password[MAX_PASSWORD_SIZE];
|
char password[MAX_PASSWORD_SIZE];
|
||||||
char dbName[MAX_DB_NAME_SIZE+1];
|
char dbName[MAX_DB_NAME_SIZE+1];
|
||||||
char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful
|
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
|
||||||
|
|
||||||
SpecifiedQueryInfo specifiedQueryInfo;
|
SpecifiedQueryInfo specifiedQueryInfo;
|
||||||
SuperQueryInfo superQueryInfo;
|
SuperQueryInfo superQueryInfo;
|
||||||
|
@ -1973,14 +1976,12 @@ static void printfQuerySystemInfo(TAOS * taos) {
|
||||||
free(dbInfos);
|
free(dbInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int postProceSql(char* host, uint16_t port, char* sqlstr)
|
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr)
|
||||||
{
|
{
|
||||||
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
|
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
|
||||||
|
|
||||||
char *url = "/rest/sql";
|
char *url = "/rest/sql";
|
||||||
|
|
||||||
struct hostent *server;
|
|
||||||
struct sockaddr_in serv_addr;
|
|
||||||
int bytes, sent, received, req_str_len, resp_len;
|
int bytes, sent, received, req_str_len, resp_len;
|
||||||
char *request_buf;
|
char *request_buf;
|
||||||
char response_buf[RESP_BUF_LEN];
|
char response_buf[RESP_BUF_LEN];
|
||||||
|
@ -2029,27 +2030,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
|
||||||
ERROR_EXIT("ERROR opening socket");
|
ERROR_EXIT("ERROR opening socket");
|
||||||
}
|
}
|
||||||
|
|
||||||
server = gethostbyname(host);
|
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
|
||||||
if (server == NULL) {
|
|
||||||
free(request_buf);
|
|
||||||
ERROR_EXIT("ERROR, no such host");
|
|
||||||
}
|
|
||||||
|
|
||||||
debugPrint("h_name: %s\nh_addretype: %s\nh_length: %d\n",
|
|
||||||
server->h_name,
|
|
||||||
(server->h_addrtype == AF_INET)?"ipv4":"ipv6",
|
|
||||||
server->h_length);
|
|
||||||
|
|
||||||
memset(&serv_addr, 0, sizeof(serv_addr));
|
|
||||||
serv_addr.sin_family = AF_INET;
|
|
||||||
serv_addr.sin_port = htons(rest_port);
|
|
||||||
#ifdef WINDOWS
|
|
||||||
serv_addr.sin_addr.s_addr = inet_addr(host);
|
|
||||||
#else
|
|
||||||
memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int retConn = connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr));
|
|
||||||
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
|
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
|
||||||
if (retConn < 0) {
|
if (retConn < 0) {
|
||||||
free(request_buf);
|
free(request_buf);
|
||||||
|
@ -3742,7 +3723,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , restful
|
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest
|
||||||
if (insertMode && insertMode->type == cJSON_String
|
if (insertMode && insertMode->type == cJSON_String
|
||||||
&& insertMode->valuestring != NULL) {
|
&& insertMode->valuestring != NULL) {
|
||||||
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
|
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
|
||||||
|
@ -4637,14 +4618,18 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, int k)
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
||||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
||||||
} else {
|
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) {
|
||||||
if (0 != postProceSql(g_Dbs.host, g_Dbs.port, buffer)) {
|
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, buffer)) {
|
||||||
affectedRows = -1;
|
affectedRows = -1;
|
||||||
printf("========restful return fail, threadID[%d]\n",
|
printf("========restful return fail, threadID[%d]\n",
|
||||||
pThreadInfo->threadID);
|
pThreadInfo->threadID);
|
||||||
} else {
|
} else {
|
||||||
affectedRows = k;
|
affectedRows = k;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
errorPrint("%s() LN%d: unknown insert mode: %s\n",
|
||||||
|
__func__, __LINE__, superTblInfo->insertMode);
|
||||||
|
affectedRows = 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
||||||
|
@ -5443,6 +5428,32 @@ static void *asyncWrite(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *serv_addr)
|
||||||
|
{
|
||||||
|
uint16_t rest_port = port + TSDB_PORT_HTTP;
|
||||||
|
struct hostent *server = gethostbyname(host);
|
||||||
|
if ((server == NULL) || (server->h_addr == NULL)) {
|
||||||
|
errorPrint("%s", "ERROR, no such host");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
debugPrint("h_name: %s\nh_addr=%p\nh_addretype: %s\nh_length: %d\n",
|
||||||
|
server->h_name,
|
||||||
|
server->h_addr,
|
||||||
|
(server->h_addrtype == AF_INET)?"ipv4":"ipv6",
|
||||||
|
server->h_length);
|
||||||
|
|
||||||
|
memset(serv_addr, 0, sizeof(struct sockaddr_in));
|
||||||
|
serv_addr->sin_family = AF_INET;
|
||||||
|
serv_addr->sin_port = htons(rest_port);
|
||||||
|
#ifdef WINDOWS
|
||||||
|
serv_addr->sin_addr.s_addr = inet_addr(host);
|
||||||
|
#else
|
||||||
|
memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void startMultiThreadInsertData(int threads, char* db_name,
|
static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
char* precision,SSuperTable* superTblInfo) {
|
char* precision,SSuperTable* superTblInfo) {
|
||||||
|
|
||||||
|
@ -5606,6 +5617,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((superTblInfo)
|
||||||
|
&& (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) {
|
||||||
|
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0)
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
t_info->threadID = i;
|
t_info->threadID = i;
|
||||||
|
@ -6014,7 +6031,7 @@ static void *specifiedTableQuery(void *sarg) {
|
||||||
|
|
||||||
st = taosGetTimestampMs();
|
st = taosGetTimestampMs();
|
||||||
|
|
||||||
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
|
||||||
int64_t t1 = taosGetTimestampMs();
|
int64_t t1 = taosGetTimestampMs();
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
||||||
|
@ -6027,9 +6044,9 @@ static void *specifiedTableQuery(void *sarg) {
|
||||||
int64_t t2 = taosGetTimestampMs();
|
int64_t t2 = taosGetTimestampMs();
|
||||||
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
|
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
|
||||||
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
|
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
|
||||||
} else {
|
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
|
||||||
int64_t t1 = taosGetTimestampMs();
|
int64_t t1 = taosGetTimestampMs();
|
||||||
int retCode = postProceSql(g_queryInfo.host,
|
int retCode = postProceSql(g_queryInfo.host, &(g_queryInfo.serv_addr),
|
||||||
g_queryInfo.port,
|
g_queryInfo.port,
|
||||||
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
|
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
|
||||||
if (0 != retCode) {
|
if (0 != retCode) {
|
||||||
|
@ -6040,6 +6057,10 @@ static void *specifiedTableQuery(void *sarg) {
|
||||||
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
|
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %10.3f s\n",
|
||||||
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
|
taosGetSelfPthreadId(), (t2 - t1)/1000.0);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
errorPrint("%s() LN%d, unknown query mode: %s\n",
|
||||||
|
__func__, __LINE__, g_queryInfo.queryMode);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
totalQueried ++;
|
totalQueried ++;
|
||||||
g_queryInfo.specifiedQueryInfo.totalQueried ++;
|
g_queryInfo.specifiedQueryInfo.totalQueried ++;
|
||||||
|
@ -6189,6 +6210,12 @@ static int queryTestProcess() {
|
||||||
|
|
||||||
printfQuerySystemInfo(taos);
|
printfQuerySystemInfo(taos);
|
||||||
|
|
||||||
|
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
|
||||||
|
if (convertHostToServAddr(
|
||||||
|
g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0)
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
pthread_t *pids = NULL;
|
pthread_t *pids = NULL;
|
||||||
threadInfo *infos = NULL;
|
threadInfo *infos = NULL;
|
||||||
//==== create sub threads for query from specify table
|
//==== create sub threads for query from specify table
|
||||||
|
|
Loading…
Reference in New Issue