[TD-3147] <fix>: support insert interval. merge develop.
This commit is contained in:
commit
3793640e85
|
@ -46,10 +46,11 @@
|
|||
#include <stdio.h>
|
||||
#include "os.h"
|
||||
|
||||
#ifdef TD_WINDOWS
|
||||
#ifdef WINDOWS
|
||||
#include <winsock2.h>
|
||||
typedef unsigned __int32 uint32_t;
|
||||
|
||||
#pragma comment ( lib, "ws2_32.lib" )
|
||||
#pragma comment ( lib, "winmm.lib" )
|
||||
#pragma comment ( lib, "wldap32.lib" )
|
||||
#endif
|
||||
#endif
|
||||
|
||||
|
@ -95,6 +96,7 @@ extern char configDir[];
|
|||
#define MAX_QUERY_SQL_LENGTH 256
|
||||
|
||||
#define MAX_DATABASE_COUNT 256
|
||||
#define INPUT_BUF_LEN 256
|
||||
|
||||
typedef enum CREATE_SUB_TALBE_MOD_EN {
|
||||
PRE_CREATE_SUBTBL,
|
||||
|
@ -1236,7 +1238,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
|||
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
|
||||
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
|
||||
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
|
||||
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
|
||||
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
|
||||
|
||||
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
|
||||
fprintf(fp, " multiThreadWriteOneTbl: no\n");
|
||||
|
@ -1598,18 +1600,17 @@ static void printfQuerySystemInfo(TAOS * taos) {
|
|||
|
||||
}
|
||||
|
||||
void ERROR_EXIT(const char *msg) { perror(msg); exit(0); }
|
||||
void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
|
||||
|
||||
int postProceSql(char* host, uint16_t port, char* sqlstr)
|
||||
{
|
||||
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\n%s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
|
||||
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
|
||||
|
||||
char *url = "/rest/sql";
|
||||
char *auth = "Authorization: Basic cm9vdDp0YW9zZGF0YQ==";
|
||||
|
||||
struct hostent *server;
|
||||
struct sockaddr_in serv_addr;
|
||||
int sockfd, bytes, sent, received, req_str_len, resp_len;
|
||||
int bytes, sent, received, req_str_len, resp_len;
|
||||
char *request_buf;
|
||||
char response_buf[RESP_BUF_LEN];
|
||||
uint16_t rest_port = port + TSDB_PORT_HTTP;
|
||||
|
@ -1620,18 +1621,37 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
if (NULL == request_buf)
|
||||
ERROR_EXIT("ERROR, cannot allocate memory.");
|
||||
|
||||
int r = snprintf(request_buf,
|
||||
req_buf_len,
|
||||
req_fmt, url, host, rest_port,
|
||||
auth, strlen(sqlstr), sqlstr);
|
||||
if (r >= req_buf_len) {
|
||||
free(request_buf);
|
||||
ERROR_EXIT("ERROR too long request");
|
||||
}
|
||||
printf("Request:\n%s\n", request_buf);
|
||||
char userpass_buf[INPUT_BUF_LEN];
|
||||
int mod_table[] = {0, 2, 1};
|
||||
|
||||
static char base64[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
|
||||
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
|
||||
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
|
||||
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
|
||||
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
|
||||
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
|
||||
'w', 'x', 'y', 'z', '0', '1', '2', '3',
|
||||
'4', '5', '6', '7', '8', '9', '+', '/'};
|
||||
|
||||
snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s",
|
||||
g_Dbs.user, g_Dbs.password);
|
||||
size_t userpass_buf_len = strlen(userpass_buf);
|
||||
size_t encoded_len = 4 * ((userpass_buf_len +2) / 3);
|
||||
|
||||
char base64_buf[INPUT_BUF_LEN];
|
||||
#ifdef WINDOWS
|
||||
WSADATA wsaData;
|
||||
WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||
SOCKET sockfd;
|
||||
#else
|
||||
int sockfd;
|
||||
#endif
|
||||
sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (sockfd < 0) {
|
||||
#ifdef WINDOWS
|
||||
fprintf(stderr, "Could not create socket : %d" , WSAGetLastError());
|
||||
#endif
|
||||
debugPrint("%s() LN%d sockfd=%d\n", __func__, __LINE__, sockfd);
|
||||
free(request_buf);
|
||||
ERROR_EXIT("ERROR opening socket");
|
||||
}
|
||||
|
@ -1642,20 +1662,68 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
ERROR_EXIT("ERROR, no such host");
|
||||
}
|
||||
|
||||
debugPrint("h_name: %s\nh_addretype: %s\nh_length: %d\n",
|
||||
server->h_name,
|
||||
(server->h_addrtype == AF_INET)?"ipv4":"ipv6",
|
||||
server->h_length);
|
||||
|
||||
memset(&serv_addr, 0, sizeof(serv_addr));
|
||||
serv_addr.sin_family = AF_INET;
|
||||
serv_addr.sin_port = htons(rest_port);
|
||||
#ifdef WINDOWS
|
||||
serv_addr.sin_addr.s_addr = inet_addr(host);
|
||||
#else
|
||||
memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
|
||||
#endif
|
||||
|
||||
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) {
|
||||
int retConn = connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr));
|
||||
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
|
||||
if (retConn < 0) {
|
||||
free(request_buf);
|
||||
ERROR_EXIT("ERROR connecting");
|
||||
}
|
||||
|
||||
memset(base64_buf, 0, INPUT_BUF_LEN);
|
||||
|
||||
for (int n = 0, m = 0; n < userpass_buf_len;) {
|
||||
uint32_t oct_a = n < userpass_buf_len ?
|
||||
(unsigned char) userpass_buf[n++]:0;
|
||||
uint32_t oct_b = n < userpass_buf_len ?
|
||||
(unsigned char) userpass_buf[n++]:0;
|
||||
uint32_t oct_c = n < userpass_buf_len ?
|
||||
(unsigned char) userpass_buf[n++]:0;
|
||||
uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
|
||||
|
||||
base64_buf[m++] = base64[(triple >> 3* 6) & 0x3f];
|
||||
base64_buf[m++] = base64[(triple >> 2* 6) & 0x3f];
|
||||
base64_buf[m++] = base64[(triple >> 1* 6) & 0x3f];
|
||||
base64_buf[m++] = base64[(triple >> 0* 6) & 0x3f];
|
||||
}
|
||||
|
||||
for (int l = 0; l < mod_table[userpass_buf_len % 3]; l++)
|
||||
base64_buf[encoded_len - 1 - l] = '=';
|
||||
|
||||
debugPrint("%s() LN%d: auth string base64 encoded: %s\n", __func__, __LINE__, base64_buf);
|
||||
char *auth = base64_buf;
|
||||
|
||||
int r = snprintf(request_buf,
|
||||
req_buf_len,
|
||||
req_fmt, url, host, rest_port,
|
||||
auth, strlen(sqlstr), sqlstr);
|
||||
if (r >= req_buf_len) {
|
||||
free(request_buf);
|
||||
ERROR_EXIT("ERROR too long request");
|
||||
}
|
||||
verbosePrint("%s() LN%d: Request:\n%s\n", __func__, __LINE__, request_buf);
|
||||
|
||||
req_str_len = strlen(request_buf);
|
||||
sent = 0;
|
||||
do {
|
||||
#ifdef WINDOWS
|
||||
bytes = send(sockfd, request_buf + sent, req_str_len - sent, 0);
|
||||
#else
|
||||
bytes = write(sockfd, request_buf + sent, req_str_len - sent);
|
||||
#endif
|
||||
if (bytes < 0)
|
||||
ERROR_EXIT("ERROR writing message to socket");
|
||||
if (bytes == 0)
|
||||
|
@ -1667,7 +1735,11 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
resp_len = sizeof(response_buf) - 1;
|
||||
received = 0;
|
||||
do {
|
||||
#ifdef WINDOWS
|
||||
bytes = recv(sockfd, response_buf + received, resp_len - received, 0);
|
||||
#else
|
||||
bytes = read(sockfd, response_buf + received, resp_len - received);
|
||||
#endif
|
||||
if (bytes < 0) {
|
||||
free(request_buf);
|
||||
ERROR_EXIT("ERROR reading response from socket");
|
||||
|
@ -1686,7 +1758,12 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
printf("Response:\n%s\n", response_buf);
|
||||
|
||||
free(request_buf);
|
||||
#ifdef WINDOWS
|
||||
closesocket(sockfd);
|
||||
WSACleanup();
|
||||
#else
|
||||
close(sockfd);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -2146,7 +2223,7 @@ static int createDatabases() {
|
|||
debugPrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
taos_close(taos);
|
||||
printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
|
||||
fprintf(stderr, "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
|
||||
return -1;
|
||||
}
|
||||
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
|
||||
|
@ -2245,6 +2322,7 @@ static void* createTable(void *sarg)
|
|||
len = 0;
|
||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
|
||||
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2259,7 +2337,9 @@ static void* createTable(void *sarg)
|
|||
|
||||
if (0 != len) {
|
||||
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
|
||||
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)) {
|
||||
fprintf(stderr, "queryDbExec() failed. buffer:\n%s\n", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
free(buffer);
|
||||
|
@ -3961,9 +4041,7 @@ send_to_server:
|
|||
int affectedRows = queryDbExec(
|
||||
winfo->taos, buffer, INSERT_TYPE);
|
||||
|
||||
if (0 > affectedRows) {
|
||||
goto free_and_statistics;
|
||||
} else {
|
||||
if (0 < affectedRows) {
|
||||
endTs = taosGetTimestampUs();
|
||||
int64_t delay = endTs - startTs;
|
||||
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
|
||||
|
@ -3972,12 +4050,15 @@ send_to_server:
|
|||
winfo->totalDelay += delay;
|
||||
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
|
||||
winfo->totalAffectedRows += affectedRows;
|
||||
} else {
|
||||
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
|
||||
goto free_and_statistics;
|
||||
}
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||
winfo->threadID,
|
||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||
winfo->threadID,
|
||||
winfo->totalRowsInserted,
|
||||
winfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
|
@ -4114,10 +4195,11 @@ static void* syncWrite(void *sarg) {
|
|||
winfo->totalAffectedRows = 0;
|
||||
|
||||
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
|
||||
int64_t tmp_time = time_counter;
|
||||
|
||||
for (int i = 0; i < g_args.num_of_DPT;) {
|
||||
|
||||
int tblInserted = i;
|
||||
int64_t tmp_time = time_counter;
|
||||
|
||||
char *pstr = buffer;
|
||||
pstr += sprintf(pstr,
|
||||
|
@ -4172,8 +4254,7 @@ static void* syncWrite(void *sarg) {
|
|||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
|
||||
|
||||
verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows);
|
||||
if (0 <= affectedRows){
|
||||
if (0 < affectedRows){
|
||||
endTs = taosGetTimestampUs();
|
||||
int64_t delay = endTs - startTs;
|
||||
if (delay > winfo->maxDelay)
|
||||
|
@ -4184,9 +4265,11 @@ static void* syncWrite(void *sarg) {
|
|||
winfo->totalDelay += delay;
|
||||
winfo->totalAffectedRows += affectedRows;
|
||||
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
|
||||
} else {
|
||||
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
|
||||
}
|
||||
|
||||
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows);
|
||||
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64" tblInserted=%d\n", __func__, __LINE__, winfo->totalAffectedRows, tblInserted);
|
||||
if (g_args.insert_interval) {
|
||||
et = taosGetTimestampMs();
|
||||
}
|
||||
|
@ -4270,7 +4353,7 @@ static void* syncWriteWithStb(void *sarg) {
|
|||
|
||||
int sampleUsePos;
|
||||
|
||||
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
|
||||
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
|
||||
|
||||
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
|
||||
tID++) {
|
||||
|
@ -4376,7 +4459,7 @@ static void* syncWriteWithStb(void *sarg) {
|
|||
}
|
||||
|
||||
len += retLen;
|
||||
verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
|
||||
verbosePrint("%s() LN%d retLen=%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
|
||||
|
||||
tblInserted++;
|
||||
k++;
|
||||
|
@ -4388,42 +4471,46 @@ static void* syncWriteWithStb(void *sarg) {
|
|||
|
||||
winfo->totalRowsInserted += k;
|
||||
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
int64_t endTs;
|
||||
int affectedRows;
|
||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
||||
int64_t startTs;
|
||||
int64_t endTs;
|
||||
startTs = taosGetTimestampUs();
|
||||
|
||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
|
||||
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
|
||||
|
||||
if (0 > affectedRows){
|
||||
goto free_and_statistics_2;
|
||||
} else {
|
||||
endTs = taosGetTimestampUs();
|
||||
int64_t delay = endTs - startTs;
|
||||
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
|
||||
if (delay < winfo->minDelay) winfo->minDelay = delay;
|
||||
winfo->cntDelay++;
|
||||
winfo->totalDelay += delay;
|
||||
}
|
||||
winfo->totalAffectedRows += affectedRows;
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||
winfo->threadID,
|
||||
winfo->totalRowsInserted,
|
||||
winfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
} else {
|
||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
|
||||
|
||||
|
||||
if (0 != retCode) {
|
||||
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
|
||||
goto free_and_statistics_2;
|
||||
}
|
||||
|
||||
affectedRows = k;
|
||||
}
|
||||
|
||||
endTs = taosGetTimestampUs();
|
||||
int64_t delay = endTs - startTs;
|
||||
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
|
||||
if (delay < winfo->minDelay) winfo->minDelay = delay;
|
||||
winfo->cntDelay++;
|
||||
winfo->totalDelay += delay;
|
||||
|
||||
winfo->totalAffectedRows += affectedRows;
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||
winfo->threadID,
|
||||
winfo->totalRowsInserted,
|
||||
winfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
|
||||
if (superTblInfo->insertInterval) {
|
||||
et = taosGetTimestampMs();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue