feat: shell support cloud (#15283)
* feat: taos shell support cloud (3.0) * fix: affected row and reuse data * feat: add custom ws timeout * feat: handle ctrl c for ws * fix: do not show any info about cloud if not websocket defined * fix: other os compile * fix: compile os error * fix: compile error * fix: review suggestions
This commit is contained in:
parent
f14511e5bb
commit
621ef9f1a8
|
@ -26,6 +26,10 @@
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
#include "taosws.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
#define SHELL_MAX_HISTORY_SIZE 1000
|
#define SHELL_MAX_HISTORY_SIZE 1000
|
||||||
#define SHELL_MAX_COMMAND_SIZE 1048586
|
#define SHELL_MAX_COMMAND_SIZE 1048586
|
||||||
#define SHELL_HISTORY_FILE ".taos_history"
|
#define SHELL_HISTORY_FILE ".taos_history"
|
||||||
|
@ -67,6 +71,12 @@ typedef struct {
|
||||||
int32_t pktNum;
|
int32_t pktNum;
|
||||||
int32_t displayWidth;
|
int32_t displayWidth;
|
||||||
int32_t abort;
|
int32_t abort;
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
bool restful;
|
||||||
|
bool cloud;
|
||||||
|
char* dsn;
|
||||||
|
int32_t timeout;
|
||||||
|
#endif
|
||||||
} SShellArgs;
|
} SShellArgs;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -85,6 +95,10 @@ typedef struct {
|
||||||
TAOS* conn;
|
TAOS* conn;
|
||||||
TdThread pid;
|
TdThread pid;
|
||||||
tsem_t cancelSem;
|
tsem_t cancelSem;
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
WS_TAOS* ws_conn;
|
||||||
|
bool stop_query;
|
||||||
|
#endif
|
||||||
} SShellObj;
|
} SShellObj;
|
||||||
|
|
||||||
// shellArguments.c
|
// shellArguments.c
|
||||||
|
@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command);
|
||||||
|
|
||||||
// shellEngine.c
|
// shellEngine.c
|
||||||
int32_t shellExecute();
|
int32_t shellExecute();
|
||||||
|
int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision);
|
||||||
|
void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields);
|
||||||
|
void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision);
|
||||||
|
void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision);
|
||||||
// shellUtil.c
|
// shellUtil.c
|
||||||
int32_t shellCheckIntSize();
|
int32_t shellCheckIntSize();
|
||||||
void shellPrintVersion();
|
void shellPrintVersion();
|
||||||
|
@ -109,6 +126,14 @@ void shellExit();
|
||||||
// shellNettest.c
|
// shellNettest.c
|
||||||
void shellTestNetWork();
|
void shellTestNetWork();
|
||||||
|
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
void shellCheckConnectMode();
|
||||||
|
// shellWebsocket.c
|
||||||
|
int shell_conn_ws_server(bool first);
|
||||||
|
int32_t shell_run_websocket();
|
||||||
|
void shellRunSingleCommandWebsocketImp(char *command);
|
||||||
|
#endif
|
||||||
|
|
||||||
// shellMain.c
|
// shellMain.c
|
||||||
extern SShellObj shell;
|
extern SShellObj shell;
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,12 @@
|
||||||
#define SHELL_VERSION "Print program version."
|
#define SHELL_VERSION "Print program version."
|
||||||
#define SHELL_EMAIL "<support@taosdata.com>"
|
#define SHELL_EMAIL "<support@taosdata.com>"
|
||||||
|
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
#define SHELL_DSN "The dsn to use when connecting to cloud server."
|
||||||
|
#define SHELL_REST "Use restful mode when connecting."
|
||||||
|
#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10."
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t shellParseSingleOpt(int32_t key, char *arg);
|
static int32_t shellParseSingleOpt(int32_t key, char *arg);
|
||||||
|
|
||||||
void shellPrintHelp() {
|
void shellPrintHelp() {
|
||||||
|
@ -65,6 +71,11 @@ void shellPrintHelp() {
|
||||||
printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD);
|
printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD);
|
||||||
printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP);
|
printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP);
|
||||||
printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER);
|
printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER);
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
printf("%s%s%s%s\r\n", indent, "-E,", indent, SHELL_DSN);
|
||||||
|
printf("%s%s%s%s\r\n", indent, "-R,", indent, SHELL_REST);
|
||||||
|
printf("%s%s%s%s\r\n", indent, "-T,", indent, SHELL_TIMEOUT);
|
||||||
|
#endif
|
||||||
printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH);
|
printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH);
|
||||||
printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION);
|
printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION);
|
||||||
printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL);
|
printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL);
|
||||||
|
@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = {
|
||||||
{"display-width", 'w', "WIDTH", 0, SHELL_WIDTH},
|
{"display-width", 'w', "WIDTH", 0, SHELL_WIDTH},
|
||||||
{"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE},
|
{"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE},
|
||||||
{"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN},
|
{"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN},
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
{"dsn", 'E', "DSN", 0, SHELL_DSN},
|
||||||
|
{"restful", 'R', 0, 0, SHELL_REST},
|
||||||
|
{"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT},
|
||||||
|
#endif
|
||||||
{"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM},
|
{"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM},
|
||||||
{0},
|
{0},
|
||||||
};
|
};
|
||||||
|
@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
|
||||||
switch (key) {
|
switch (key) {
|
||||||
case 'h':
|
case 'h':
|
||||||
pArgs->host = arg;
|
pArgs->host = arg;
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
pArgs->cloud = false;
|
||||||
|
#endif
|
||||||
break;
|
break;
|
||||||
case 'P':
|
case 'P':
|
||||||
pArgs->port = atoi(arg);
|
pArgs->port = atoi(arg);
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
pArgs->cloud = false;
|
||||||
|
#endif
|
||||||
if (pArgs->port == 0) pArgs->port = -1;
|
if (pArgs->port == 0) pArgs->port = -1;
|
||||||
break;
|
break;
|
||||||
case 'u':
|
case 'u':
|
||||||
|
@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
|
||||||
pArgs->is_gen_auth = true;
|
pArgs->is_gen_auth = true;
|
||||||
break;
|
break;
|
||||||
case 'c':
|
case 'c':
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
pArgs->cloud = false;
|
||||||
|
#endif
|
||||||
pArgs->cfgdir = arg;
|
pArgs->cfgdir = arg;
|
||||||
break;
|
break;
|
||||||
case 'C':
|
case 'C':
|
||||||
|
@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
|
||||||
case 'N':
|
case 'N':
|
||||||
pArgs->pktNum = atoi(arg);
|
pArgs->pktNum = atoi(arg);
|
||||||
break;
|
break;
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
case 'R':
|
||||||
|
pArgs->restful = true;
|
||||||
|
break;
|
||||||
|
case 'E':
|
||||||
|
pArgs->dsn = arg;
|
||||||
|
pArgs->cloud = true;
|
||||||
|
break;
|
||||||
|
case 'T':
|
||||||
|
pArgs->timeout = atoi(arg);
|
||||||
|
break;
|
||||||
|
#endif
|
||||||
case 'V':
|
case 'V':
|
||||||
pArgs->is_version = true;
|
pArgs->is_version = true;
|
||||||
break;
|
break;
|
||||||
|
@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' ||
|
if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' ||
|
||||||
key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N') {
|
key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
|| key[1] == 'E' || key[1] == 'T'
|
||||||
|
#endif
|
||||||
|
) {
|
||||||
if (i + 1 >= argc) {
|
if (i + 1 >= argc) {
|
||||||
fprintf(stderr, "option %s requires an argument\r\n", key);
|
fprintf(stderr, "option %s requires an argument\r\n", key);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
|
||||||
shellParseSingleOpt(key[1], val);
|
shellParseSingleOpt(key[1], val);
|
||||||
i++;
|
i++;
|
||||||
} else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' ||
|
} else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' ||
|
||||||
key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1) {
|
key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
||key[1] == 'R'
|
||||||
|
#endif
|
||||||
|
) {
|
||||||
shellParseSingleOpt(key[1], NULL);
|
shellParseSingleOpt(key[1], NULL);
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "invalid option %s\r\n", key);
|
fprintf(stderr, "invalid option %s\r\n", key);
|
||||||
|
|
|
@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command);
|
||||||
static int32_t shellRunCommand(char *command);
|
static int32_t shellRunCommand(char *command);
|
||||||
static void shellRunSingleCommandImp(char *command);
|
static void shellRunSingleCommandImp(char *command);
|
||||||
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision);
|
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision);
|
||||||
static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length,
|
|
||||||
int32_t precision);
|
|
||||||
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
|
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
|
||||||
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
|
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
|
||||||
static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision);
|
|
||||||
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
|
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
|
||||||
static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision);
|
|
||||||
static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields);
|
|
||||||
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
|
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
|
||||||
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
|
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
|
||||||
static void shellReadHistory();
|
static void shellReadHistory();
|
||||||
|
@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) {
|
||||||
shellSourceFile(c_ptr);
|
shellSourceFile(c_ptr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#ifdef WEBSOCKET
|
||||||
shellRunSingleCommandImp(command);
|
if (shell.args.restful || shell.args.cloud) {
|
||||||
|
shellRunSingleCommandWebsocketImp(command);
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
shellRunSingleCommandImp(command);
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
taos_kill_query(shell.conn);
|
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
if (shell.args.restful || shell.args.cloud) {
|
||||||
|
shell.stop_query = true;
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
taos_kill_query(shell.conn);
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
}
|
||||||
|
#endif
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
printf("\n%s", shell.info.promptHeader);
|
printf("\n%s", shell.info.promptHeader);
|
||||||
#endif
|
#endif
|
||||||
|
@ -981,16 +992,26 @@ int32_t shellExecute() {
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
|
|
||||||
SShellArgs *pArgs = &shell.args;
|
SShellArgs *pArgs = &shell.args;
|
||||||
if (shell.args.auth == NULL) {
|
#ifdef WEBSOCKET
|
||||||
shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
|
if (shell.args.restful || shell.args.cloud) {
|
||||||
|
if (shell_conn_ws_server(1)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
|
#endif
|
||||||
}
|
if (shell.args.auth == NULL) {
|
||||||
|
shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
|
||||||
|
} else {
|
||||||
|
shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
|
||||||
|
}
|
||||||
|
|
||||||
if (shell.conn == NULL) {
|
if (shell.conn == NULL) {
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
#ifdef WEBSOCKET
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
shellReadHistory();
|
shellReadHistory();
|
||||||
|
|
||||||
|
@ -1005,8 +1026,16 @@ int32_t shellExecute() {
|
||||||
if (pArgs->file[0] != 0) {
|
if (pArgs->file[0] != 0) {
|
||||||
shellSourceFile(pArgs->file);
|
shellSourceFile(pArgs->file);
|
||||||
}
|
}
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
if (shell.args.restful || shell.args.cloud) {
|
||||||
|
ws_close(shell.ws_conn);
|
||||||
|
} else {
|
||||||
|
#endif
|
||||||
|
taos_close(shell.conn);
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
taos_close(shell.conn);
|
|
||||||
shellWriteHistory();
|
shellWriteHistory();
|
||||||
shellCleanupHistory();
|
shellCleanupHistory();
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1026,10 +1055,15 @@ int32_t shellExecute() {
|
||||||
|
|
||||||
taosSetSignal(SIGINT, shellQueryInterruptHandler);
|
taosSetSignal(SIGINT, shellQueryInterruptHandler);
|
||||||
|
|
||||||
shellGetGrantInfo();
|
#ifdef WEBSOCKET
|
||||||
|
if (!shell.args.restful && !shell.args.cloud) {
|
||||||
|
#endif
|
||||||
|
shellGetGrantInfo();
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
}
|
||||||
|
#endif
|
||||||
while (1) {
|
while (1) {
|
||||||
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
|
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL);
|
||||||
taosThreadJoin(shell.pid, NULL);
|
taosThreadJoin(shell.pid, NULL);
|
||||||
taosThreadClear(&shell.pid);
|
taosThreadClear(&shell.pid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,11 @@
|
||||||
SShellObj shell = {0};
|
SShellObj shell = {0};
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
shell.args.timeout = 10;
|
||||||
|
shell.args.cloud = true;
|
||||||
|
#endif
|
||||||
|
|
||||||
if (shellCheckIntSize() != 0) {
|
if (shellCheckIntSize() != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -41,7 +46,9 @@ int main(int argc, char *argv[]) {
|
||||||
shellPrintHelp();
|
shellPrintHelp();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
shellCheckConnectMode();
|
||||||
|
#endif
|
||||||
taos_init();
|
taos_init();
|
||||||
|
|
||||||
if (shell.args.is_dump_config) {
|
if (shell.args.is_dump_config) {
|
||||||
|
|
|
@ -121,6 +121,36 @@ void shellCheckServerStatus() {
|
||||||
}
|
}
|
||||||
} while (1);
|
} while (1);
|
||||||
}
|
}
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
void shellCheckConnectMode() {
|
||||||
|
if (shell.args.dsn) {
|
||||||
|
shell.args.cloud = true;
|
||||||
|
shell.args.restful = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (shell.args.cloud) {
|
||||||
|
shell.args.dsn = getenv("TDENGINE_CLOUD_DSN");
|
||||||
|
if (shell.args.dsn) {
|
||||||
|
shell.args.cloud = true;
|
||||||
|
shell.args.restful = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (shell.args.restful) {
|
||||||
|
if (!shell.args.host) {
|
||||||
|
shell.args.host = "localhost";
|
||||||
|
}
|
||||||
|
if (!shell.args.port) {
|
||||||
|
shell.args.port = 6041;
|
||||||
|
}
|
||||||
|
shell.args.dsn = taosMemoryCalloc(1, 1024);
|
||||||
|
snprintf(shell.args.dsn, 1024, "ws://%s:%d/rest/ws",
|
||||||
|
shell.args.host, shell.args.port);
|
||||||
|
}
|
||||||
|
shell.args.cloud = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void shellExit() {
|
void shellExit() {
|
||||||
if (shell.conn != NULL) {
|
if (shell.conn != NULL) {
|
||||||
|
@ -129,4 +159,4 @@ void shellExit() {
|
||||||
}
|
}
|
||||||
taos_cleanup();
|
taos_cleanup();
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,260 @@
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifdef WEBSOCKET
|
||||||
|
#include "taosws.h"
|
||||||
|
#include "shellInt.h"
|
||||||
|
|
||||||
|
int shell_conn_ws_server(bool first) {
|
||||||
|
shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
|
||||||
|
if (!shell.ws_conn) {
|
||||||
|
fprintf(stderr, "failed to connect %s, reason: %s\n",
|
||||||
|
shell.args.dsn, ws_errstr(NULL));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (first && shell.args.restful) {
|
||||||
|
fprintf(stdout, "successfully connect to %s\n\n",
|
||||||
|
shell.args.dsn);
|
||||||
|
} else if (first && shell.args.cloud) {
|
||||||
|
fprintf(stdout, "successfully connect to cloud service\n");
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int horizontalPrintWebsocket(WS_RES* wres) {
|
||||||
|
const void* data = NULL;
|
||||||
|
int rows;
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
if (!rows) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int num_fields = ws_field_count(wres);
|
||||||
|
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
|
||||||
|
int precision = ws_result_precision(wres);
|
||||||
|
|
||||||
|
int width[TSDB_MAX_COLUMNS];
|
||||||
|
for (int col = 0; col < num_fields; col++) {
|
||||||
|
width[col] = shellCalcColWidth(fields + col, precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
shellPrintHeader(fields, width, num_fields);
|
||||||
|
|
||||||
|
int numOfRows = 0;
|
||||||
|
do {
|
||||||
|
uint8_t ty;
|
||||||
|
uint32_t len;
|
||||||
|
for (int i = 0; i < rows; i++) {
|
||||||
|
for (int j = 0; j < num_fields; j++) {
|
||||||
|
putchar(' ');
|
||||||
|
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
|
||||||
|
shellPrintField((const char*)value, fields+j, width[j], len, precision);
|
||||||
|
putchar(' ');
|
||||||
|
putchar('|');
|
||||||
|
}
|
||||||
|
putchar('\r');
|
||||||
|
putchar('\n');
|
||||||
|
}
|
||||||
|
numOfRows += rows;
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
} while (rows && !shell.stop_query);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int verticalPrintWebsocket(WS_RES* wres) {
|
||||||
|
int rows = 0;
|
||||||
|
const void* data = NULL;
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
if (!rows) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int num_fields = ws_field_count(wres);
|
||||||
|
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
|
||||||
|
int precision = ws_result_precision(wres);
|
||||||
|
|
||||||
|
int maxColNameLen = 0;
|
||||||
|
for (int col = 0; col < num_fields; col++) {
|
||||||
|
int len = (int)strlen(fields[col].name);
|
||||||
|
if (len > maxColNameLen) {
|
||||||
|
maxColNameLen = len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int numOfRows = 0;
|
||||||
|
do {
|
||||||
|
uint8_t ty;
|
||||||
|
uint32_t len;
|
||||||
|
for (int i = 0; i < rows; i++) {
|
||||||
|
printf("*************************** %d.row ***************************\n",
|
||||||
|
numOfRows + 1);
|
||||||
|
for (int j = 0; j < num_fields; j++) {
|
||||||
|
TAOS_FIELD* field = fields + j;
|
||||||
|
int padding = (int)(maxColNameLen - strlen(field->name));
|
||||||
|
printf("%*.s%s: ", padding, " ", field->name);
|
||||||
|
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
|
||||||
|
shellPrintField((const char*)value, field, 0, len, precision);
|
||||||
|
putchar('\n');
|
||||||
|
}
|
||||||
|
numOfRows++;
|
||||||
|
}
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
} while (rows && !shell.stop_query);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
|
||||||
|
char fullname[PATH_MAX] = {0};
|
||||||
|
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
|
||||||
|
tstrncpy(fullname, fname, PATH_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
TdFilePtr pFile = taosOpenFile(fullname,
|
||||||
|
TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
|
if (pFile == NULL) {
|
||||||
|
fprintf(stderr, "failed to open file: %s\r\n", fullname);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int rows = 0;
|
||||||
|
const void* data = NULL;
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
if (!rows) {
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int numOfRows = 0;
|
||||||
|
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
|
||||||
|
int num_fields = ws_field_count(wres);
|
||||||
|
int precision = ws_result_precision(wres);
|
||||||
|
for (int col = 0; col < num_fields; col++) {
|
||||||
|
if (col > 0) {
|
||||||
|
taosFprintfFile(pFile, ",");
|
||||||
|
}
|
||||||
|
taosFprintfFile(pFile, "%s", fields[col].name);
|
||||||
|
}
|
||||||
|
taosFprintfFile(pFile, "\r\n");
|
||||||
|
do {
|
||||||
|
uint8_t ty;
|
||||||
|
uint32_t len;
|
||||||
|
numOfRows += rows;
|
||||||
|
for (int i = 0; i < rows; i++) {
|
||||||
|
for (int j = 0; j < num_fields; j++) {
|
||||||
|
if (j > 0) {
|
||||||
|
taosFprintfFile(pFile, ",");
|
||||||
|
}
|
||||||
|
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
|
||||||
|
shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision);
|
||||||
|
}
|
||||||
|
taosFprintfFile(pFile, "\r\n");
|
||||||
|
}
|
||||||
|
ws_fetch_block(wres, &data, &rows);
|
||||||
|
} while (rows && !shell.stop_query);
|
||||||
|
taosCloseFile(&pFile);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) {
|
||||||
|
int numOfRows = 0;
|
||||||
|
if (fname != NULL) {
|
||||||
|
numOfRows = dumpWebsocketToFile(fname, wres);
|
||||||
|
} else if (vertical) {
|
||||||
|
numOfRows = verticalPrintWebsocket(wres);
|
||||||
|
} else {
|
||||||
|
numOfRows = horizontalPrintWebsocket(wres);
|
||||||
|
}
|
||||||
|
*error_no = ws_errno(wres);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
void shellRunSingleCommandWebsocketImp(char *command) {
|
||||||
|
int64_t st, et;
|
||||||
|
char *sptr = NULL;
|
||||||
|
char *cptr = NULL;
|
||||||
|
char *fname = NULL;
|
||||||
|
bool printMode = false;
|
||||||
|
|
||||||
|
if ((sptr = strstr(command, ">>")) != NULL) {
|
||||||
|
cptr = strstr(command, ";");
|
||||||
|
if (cptr != NULL) {
|
||||||
|
*cptr = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
fname = sptr + 2;
|
||||||
|
while (*fname == ' ') fname++;
|
||||||
|
*sptr = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((sptr = strstr(command, "\\G")) != NULL) {
|
||||||
|
cptr = strstr(command, ";");
|
||||||
|
if (cptr != NULL) {
|
||||||
|
*cptr = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
*sptr = '\0';
|
||||||
|
printMode = true; // When output to a file, the switch does not work.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!shell.ws_conn && shell_conn_ws_server(0)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
shell.stop_query = false;
|
||||||
|
st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
WS_RES* res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout);
|
||||||
|
int code = ws_errno(res);
|
||||||
|
if (code != 0) {
|
||||||
|
et = taosGetTimestampUs();
|
||||||
|
fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6);
|
||||||
|
if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) {
|
||||||
|
fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n");
|
||||||
|
} else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) {
|
||||||
|
fprintf(stderr, "TDengine server is down, will try to reconnect\n");
|
||||||
|
shell.ws_conn = NULL;
|
||||||
|
}
|
||||||
|
ws_free_result(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
||||||
|
fprintf(stdout, "Database changed.\r\n\r\n");
|
||||||
|
fflush(stdout);
|
||||||
|
ws_free_result(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int numOfRows = 0;
|
||||||
|
if (ws_is_update_query(res)) {
|
||||||
|
numOfRows = ws_affected_rows(res);
|
||||||
|
et = taosGetTimestampUs();
|
||||||
|
printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows,
|
||||||
|
(et - st)/1E6);
|
||||||
|
} else {
|
||||||
|
int error_no = 0;
|
||||||
|
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode);
|
||||||
|
if (numOfRows < 0) {
|
||||||
|
ws_free_result(res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
et = taosGetTimestampUs();
|
||||||
|
if (error_no == 0 && !shell.stop_query) {
|
||||||
|
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows,
|
||||||
|
(et - st)/1E6);
|
||||||
|
} else {
|
||||||
|
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
|
||||||
|
(et - st)/1E6);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
ws_free_result(res);
|
||||||
|
}
|
||||||
|
#endif
|
Loading…
Reference in New Issue