diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index 825866e163..358377f804 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -26,6 +26,10 @@ #include "ttypes.h" #include "tutil.h" +#ifdef WEBSOCKET +#include "taosws.h" +#endif + #define SHELL_MAX_HISTORY_SIZE 1000 #define SHELL_MAX_COMMAND_SIZE 1048586 #define SHELL_HISTORY_FILE ".taos_history" @@ -67,6 +71,12 @@ typedef struct { int32_t pktNum; int32_t displayWidth; int32_t abort; +#ifdef WEBSOCKET + bool restful; + bool cloud; + char* dsn; + int32_t timeout; +#endif } SShellArgs; typedef struct { @@ -85,6 +95,10 @@ typedef struct { TAOS* conn; TdThread pid; tsem_t cancelSem; +#ifdef WEBSOCKET + WS_TAOS* ws_conn; + bool stop_query; +#endif } SShellObj; // shellArguments.c @@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command); // shellEngine.c int32_t shellExecute(); - +int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); +void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); +void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); +void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision); // shellUtil.c int32_t shellCheckIntSize(); void shellPrintVersion(); @@ -109,6 +126,14 @@ void shellExit(); // shellNettest.c void shellTestNetWork(); +#ifdef WEBSOCKET +void shellCheckConnectMode(); +// shellWebsocket.c +int shell_conn_ws_server(bool first); +int32_t shell_run_websocket(); +void shellRunSingleCommandWebsocketImp(char *command); +#endif + // shellMain.c extern SShellObj shell; diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 466aa52390..88ef46c5d6 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -43,6 +43,12 @@ #define SHELL_VERSION "Print program version." #define SHELL_EMAIL "" +#ifdef WEBSOCKET +#define SHELL_DSN "The dsn to use when connecting to cloud server." +#define SHELL_REST "Use restful mode when connecting." +#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10." +#endif + static int32_t shellParseSingleOpt(int32_t key, char *arg); void shellPrintHelp() { @@ -65,6 +71,11 @@ void shellPrintHelp() { printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD); printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP); printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER); +#ifdef WEBSOCKET + printf("%s%s%s%s\r\n", indent, "-E,", indent, SHELL_DSN); + printf("%s%s%s%s\r\n", indent, "-R,", indent, SHELL_REST); + printf("%s%s%s%s\r\n", indent, "-T,", indent, SHELL_TIMEOUT); +#endif printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH); printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION); printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL); @@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = { {"display-width", 'w', "WIDTH", 0, SHELL_WIDTH}, {"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE}, {"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN}, +#ifdef WEBSOCKET + {"dsn", 'E', "DSN", 0, SHELL_DSN}, + {"restful", 'R', 0, 0, SHELL_REST}, + {"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT}, +#endif {"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM}, {0}, }; @@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { switch (key) { case 'h': pArgs->host = arg; +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif break; case 'P': pArgs->port = atoi(arg); +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif if (pArgs->port == 0) pArgs->port = -1; break; case 'u': @@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { pArgs->is_gen_auth = true; break; case 'c': +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif pArgs->cfgdir = arg; break; case 'C': @@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { case 'N': pArgs->pktNum = atoi(arg); break; +#ifdef WEBSOCKET + case 'R': + pArgs->restful = true; + break; + case 'E': + pArgs->dsn = arg; + pArgs->cloud = true; + break; + case 'T': + pArgs->timeout = atoi(arg); + break; +#endif case 'V': pArgs->is_version = true; break; @@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { } if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' || - key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N') { + key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N' +#ifdef WEBSOCKET + || key[1] == 'E' || key[1] == 'T' +#endif + ) { if (i + 1 >= argc) { fprintf(stderr, "option %s requires an argument\r\n", key); return -1; @@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { shellParseSingleOpt(key[1], val); i++; } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' || - key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1) { + key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1 +#ifdef WEBSOCKET + ||key[1] == 'R' +#endif + ) { shellParseSingleOpt(key[1], NULL); } else { fprintf(stderr, "invalid option %s\r\n", key); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 56bc1ed6cc..a2310ea9c9 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command); static int32_t shellRunCommand(char *command); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); -static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, - int32_t precision); static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); -static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); -static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); @@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) { shellSourceFile(c_ptr); return 0; } - - shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shellRunSingleCommandWebsocketImp(command); + } else { +#endif + shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + } +#endif return 0; } @@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) { taosMsleep(10); continue; } - taos_kill_query(shell.conn); + +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shell.stop_query = true; + } else { +#endif + taos_kill_query(shell.conn); +#ifdef WEBSOCKET + } +#endif #ifdef WINDOWS printf("\n%s", shell.info.promptHeader); #endif @@ -981,16 +992,26 @@ int32_t shellExecute() { fflush(stdout); SShellArgs *pArgs = &shell.args; - if (shell.args.auth == NULL) { - shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + if (shell_conn_ws_server(1)) { + return -1; + } } else { - shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); - } +#endif + if (shell.args.auth == NULL) { + shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); + } else { + shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); + } - if (shell.conn == NULL) { - fflush(stdout); - return -1; + if (shell.conn == NULL) { + fflush(stdout); + return -1; + } +#ifdef WEBSOCKET } +#endif shellReadHistory(); @@ -1005,8 +1026,16 @@ int32_t shellExecute() { if (pArgs->file[0] != 0) { shellSourceFile(pArgs->file); } +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + ws_close(shell.ws_conn); + } else { +#endif + taos_close(shell.conn); +#ifdef WEBSOCKET + } +#endif - taos_close(shell.conn); shellWriteHistory(); shellCleanupHistory(); return 0; @@ -1026,10 +1055,15 @@ int32_t shellExecute() { taosSetSignal(SIGINT, shellQueryInterruptHandler); - shellGetGrantInfo(); - +#ifdef WEBSOCKET + if (!shell.args.restful && !shell.args.cloud) { +#endif + shellGetGrantInfo(); +#ifdef WEBSOCKET + } +#endif while (1) { - taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn); + taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL); taosThreadJoin(shell.pid, NULL); taosThreadClear(&shell.pid); } diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 6672cee367..703533f8a9 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -19,6 +19,11 @@ SShellObj shell = {0}; int main(int argc, char *argv[]) { +#ifdef WEBSOCKET + shell.args.timeout = 10; + shell.args.cloud = true; +#endif + if (shellCheckIntSize() != 0) { return -1; } @@ -41,7 +46,9 @@ int main(int argc, char *argv[]) { shellPrintHelp(); return 0; } - +#ifdef WEBSOCKET + shellCheckConnectMode(); +#endif taos_init(); if (shell.args.is_dump_config) { diff --git a/tools/shell/src/shellUtil.c b/tools/shell/src/shellUtil.c index e96e3d3619..e5e61e0b24 100644 --- a/tools/shell/src/shellUtil.c +++ b/tools/shell/src/shellUtil.c @@ -121,6 +121,36 @@ void shellCheckServerStatus() { } } while (1); } +#ifdef WEBSOCKET +void shellCheckConnectMode() { + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.cloud) { + shell.args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.restful) { + if (!shell.args.host) { + shell.args.host = "localhost"; + } + if (!shell.args.port) { + shell.args.port = 6041; + } + shell.args.dsn = taosMemoryCalloc(1, 1024); + snprintf(shell.args.dsn, 1024, "ws://%s:%d/rest/ws", + shell.args.host, shell.args.port); + } + shell.args.cloud = false; + return; + } +} +#endif void shellExit() { if (shell.conn != NULL) { @@ -129,4 +159,4 @@ void shellExit() { } taos_cleanup(); exit(EXIT_FAILURE); -} \ No newline at end of file +} diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c new file mode 100644 index 0000000000..fee2325c34 --- /dev/null +++ b/tools/shell/src/shellWebsocket.c @@ -0,0 +1,260 @@ + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef WEBSOCKET +#include "taosws.h" +#include "shellInt.h" + +int shell_conn_ws_server(bool first) { + shell.ws_conn = ws_connect_with_dsn(shell.args.dsn); + if (!shell.ws_conn) { + fprintf(stderr, "failed to connect %s, reason: %s\n", + shell.args.dsn, ws_errstr(NULL)); + return -1; + } + if (first && shell.args.restful) { + fprintf(stdout, "successfully connect to %s\n\n", + shell.args.dsn); + } else if (first && shell.args.cloud) { + fprintf(stdout, "successfully connect to cloud service\n"); + } + return 0; +} + +static int horizontalPrintWebsocket(WS_RES* wres) { + const void* data = NULL; + int rows; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int width[TSDB_MAX_COLUMNS]; + for (int col = 0; col < num_fields; col++) { + width[col] = shellCalcColWidth(fields + col, precision); + } + + shellPrintHeader(fields, width, num_fields); + + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + putchar(' '); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, fields+j, width[j], len, precision); + putchar(' '); + putchar('|'); + } + putchar('\r'); + putchar('\n'); + } + numOfRows += rows; + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int verticalPrintWebsocket(WS_RES* wres) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int maxColNameLen = 0; + for (int col = 0; col < num_fields; col++) { + int len = (int)strlen(fields[col].name); + if (len > maxColNameLen) { + maxColNameLen = len; + } + } + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + printf("*************************** %d.row ***************************\n", + numOfRows + 1); + for (int j = 0; j < num_fields; j++) { + TAOS_FIELD* field = fields + j; + int padding = (int)(maxColNameLen - strlen(field->name)); + printf("%*.s%s: ", padding, " ", field->name); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, field, 0, len, precision); + putchar('\n'); + } + numOfRows++; + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { + char fullname[PATH_MAX] = {0}; + if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { + tstrncpy(fullname, fname, PATH_MAX); + } + + TdFilePtr pFile = taosOpenFile(fullname, + TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + if (pFile == NULL) { + fprintf(stderr, "failed to open file: %s\r\n", fullname); + return -1; + } + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + taosCloseFile(&pFile); + return 0; + } + int numOfRows = 0; + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int num_fields = ws_field_count(wres); + int precision = ws_result_precision(wres); + for (int col = 0; col < num_fields; col++) { + if (col > 0) { + taosFprintfFile(pFile, ","); + } + taosFprintfFile(pFile, "%s", fields[col].name); + } + taosFprintfFile(pFile, "\r\n"); + do { + uint8_t ty; + uint32_t len; + numOfRows += rows; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + if (j > 0) { + taosFprintfFile(pFile, ","); + } + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision); + } + taosFprintfFile(pFile, "\r\n"); + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + taosCloseFile(&pFile); + return numOfRows; +} + +static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { + int numOfRows = 0; + if (fname != NULL) { + numOfRows = dumpWebsocketToFile(fname, wres); + } else if (vertical) { + numOfRows = verticalPrintWebsocket(wres); + } else { + numOfRows = horizontalPrintWebsocket(wres); + } + *error_no = ws_errno(wres); + return numOfRows; +} + +void shellRunSingleCommandWebsocketImp(char *command) { + int64_t st, et; + char *sptr = NULL; + char *cptr = NULL; + char *fname = NULL; + bool printMode = false; + + if ((sptr = strstr(command, ">>")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + fname = sptr + 2; + while (*fname == ' ') fname++; + *sptr = '\0'; + } + + if ((sptr = strstr(command, "\\G")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + *sptr = '\0'; + printMode = true; // When output to a file, the switch does not work. + } + + if (!shell.ws_conn && shell_conn_ws_server(0)) { + return; + } + + shell.stop_query = false; + st = taosGetTimestampUs(); + + WS_RES* res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout); + int code = ws_errno(res); + if (code != 0) { + et = taosGetTimestampUs(); + fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6); + if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) { + fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n"); + } else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) { + fprintf(stderr, "TDengine server is down, will try to reconnect\n"); + shell.ws_conn = NULL; + } + ws_free_result(res); + return; + } + + if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { + fprintf(stdout, "Database changed.\r\n\r\n"); + fflush(stdout); + ws_free_result(res); + return; + } + + int numOfRows = 0; + if (ws_is_update_query(res)) { + numOfRows = ws_affected_rows(res); + et = taosGetTimestampUs(); + printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, + (et - st)/1E6); + } else { + int error_no = 0; + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + if (numOfRows < 0) { + ws_free_result(res); + return; + } + et = taosGetTimestampUs(); + if (error_no == 0 && !shell.stop_query) { + printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } else { + printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } + } + printf("\n"); + ws_free_result(res); +} +#endif