add shell tools
This commit is contained in:
parent
bee74151bb
commit
ea3cde9136
|
@ -36,6 +36,7 @@ ELSEIF (TD_DARWIN)
|
||||||
LIST(APPEND SRC ./src/shellDarwin.c)
|
LIST(APPEND SRC ./src/shellDarwin.c)
|
||||||
LIST(APPEND SRC ./src/shellCommand.c)
|
LIST(APPEND SRC ./src/shellCommand.c)
|
||||||
LIST(APPEND SRC ./src/shellImport.c)
|
LIST(APPEND SRC ./src/shellImport.c)
|
||||||
|
LIST(APPEND SRC ./src/shellCheck.c)
|
||||||
ADD_EXECUTABLE(shell ${SRC})
|
ADD_EXECUTABLE(shell ${SRC})
|
||||||
# linking with dylib
|
# linking with dylib
|
||||||
TARGET_LINK_LIBRARIES(shell taos)
|
TARGET_LINK_LIBRARIES(shell taos)
|
||||||
|
|
|
@ -52,6 +52,7 @@ typedef struct SShellArguments {
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char dir[TSDB_FILENAME_LEN];
|
||||||
int threadNum;
|
int threadNum;
|
||||||
char* commands;
|
char* commands;
|
||||||
|
int check;
|
||||||
int abort;
|
int abort;
|
||||||
int port;
|
int port;
|
||||||
int pktLen;
|
int pktLen;
|
||||||
|
@ -72,6 +73,7 @@ void write_history();
|
||||||
void source_file(TAOS* con, char* fptr);
|
void source_file(TAOS* con, char* fptr);
|
||||||
void source_dir(TAOS* con, SShellArguments* args);
|
void source_dir(TAOS* con, SShellArguments* args);
|
||||||
void get_history_path(char* history);
|
void get_history_path(char* history);
|
||||||
|
void shellCheck(TAOS* con, SShellArguments* args);
|
||||||
void cleanup_handler(void* arg);
|
void cleanup_handler(void* arg);
|
||||||
void exitShell();
|
void exitShell();
|
||||||
int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode);
|
int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode);
|
||||||
|
|
|
@ -0,0 +1,199 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _GNU_SOURCE
|
||||||
|
#define _XOPEN_SOURCE
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "shell.h"
|
||||||
|
#include "shellCommand.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
|
#define SHELL_SQL_LEN 1024
|
||||||
|
static int32_t tbNum = 0;
|
||||||
|
static int32_t tbMallocNum = 0;
|
||||||
|
static char ** tbNames = NULL;
|
||||||
|
static int32_t checkedNum = 0;
|
||||||
|
static int32_t errorNum = 0;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pthread_t threadID;
|
||||||
|
int threadIndex;
|
||||||
|
int totalThreads;
|
||||||
|
void * taos;
|
||||||
|
char * db;
|
||||||
|
} ShellThreadObj;
|
||||||
|
|
||||||
|
static int32_t shellUseDb(TAOS *con, char *db) {
|
||||||
|
if (db == NULL) {
|
||||||
|
fprintf(stdout, "no dbname input\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char sql[SHELL_SQL_LEN] = {0};
|
||||||
|
snprintf(sql, SHELL_SQL_LEN, "use %s", db);
|
||||||
|
|
||||||
|
TAOS_RES *pSql = taos_query(con, sql);
|
||||||
|
int32_t code = taos_errno(pSql);
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql));
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pSql);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t shellShowTables(TAOS *con, char *db) {
|
||||||
|
char sql[SHELL_SQL_LEN] = {0};
|
||||||
|
snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db);
|
||||||
|
|
||||||
|
TAOS_RES *pSql = taos_query(con, sql);
|
||||||
|
int32_t code = taos_errno(pSql);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql));
|
||||||
|
} else {
|
||||||
|
TAOS_ROW row;
|
||||||
|
while ((row = taos_fetch_row(pSql))) {
|
||||||
|
int32_t tbIndex = tbNum++;
|
||||||
|
if (tbMallocNum < tbNum) {
|
||||||
|
tbMallocNum = (tbMallocNum * 2 + 1);
|
||||||
|
tbNames = realloc(tbNames, tbMallocNum * sizeof(char *));
|
||||||
|
if (tbNames == NULL) {
|
||||||
|
fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum);
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN);
|
||||||
|
strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN);
|
||||||
|
if (tbIndex % 100000 == 0 && tbIndex != 0) {
|
||||||
|
fprintf(stdout, "%d tablenames fetched\n", tbIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pSql);
|
||||||
|
|
||||||
|
fprintf(stdout, "total %d tablenames fetched, over\n", tbNum);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void shellFreeTbnames() {
|
||||||
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
|
free(tbNames[i]);
|
||||||
|
}
|
||||||
|
free(tbNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *shellCheckThreadFp(void *arg) {
|
||||||
|
ShellThreadObj *pThread = (ShellThreadObj *)arg;
|
||||||
|
|
||||||
|
int32_t interval = tbNum / pThread->totalThreads + 1;
|
||||||
|
int32_t start = pThread->threadIndex * interval;
|
||||||
|
int32_t end = (pThread->threadIndex + 1) * interval;
|
||||||
|
|
||||||
|
if (end > tbNum) end = tbNum + 1;
|
||||||
|
|
||||||
|
char file[32] = {0};
|
||||||
|
snprintf(file, 32, "tb%d.txt", pThread->threadIndex);
|
||||||
|
|
||||||
|
FILE *fp = fopen(file, "w");
|
||||||
|
if (!fp) {
|
||||||
|
fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char sql[SHELL_SQL_LEN];
|
||||||
|
for (int32_t t = start; t < end; ++t) {
|
||||||
|
char *tbname = tbNames[t];
|
||||||
|
if (tbname == NULL) break;
|
||||||
|
|
||||||
|
snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname);
|
||||||
|
|
||||||
|
TAOS_RES *pSql = taos_query(pThread->taos, sql);
|
||||||
|
int32_t code = taos_errno(pSql);
|
||||||
|
if (code != 0) {
|
||||||
|
int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname);
|
||||||
|
fwrite(sql, 1, len, fp);
|
||||||
|
atomic_add_fetch_32(&errorNum, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t cnum = atomic_add_fetch_32(&checkedNum, 1);
|
||||||
|
if (cnum % 5000 == 0 && cnum != 0) {
|
||||||
|
fprintf(stdout, "%d tables checked\n", cnum);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pSql);
|
||||||
|
}
|
||||||
|
|
||||||
|
fsync(fileno(fp));
|
||||||
|
fclose(fp);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void shellRunCheckThreads(TAOS *con, SShellArguments *args) {
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj));
|
||||||
|
for (int t = 0; t < args->threadNum; ++t) {
|
||||||
|
ShellThreadObj *pThread = threadObj + t;
|
||||||
|
pThread->threadIndex = t;
|
||||||
|
pThread->totalThreads = args->threadNum;
|
||||||
|
pThread->taos = con;
|
||||||
|
pThread->db = args->database;
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) {
|
||||||
|
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int t = 0; t < args->threadNum; ++t) {
|
||||||
|
pthread_join(threadObj[t].threadID, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int t = 0; t < args->threadNum; ++t) {
|
||||||
|
taos_close(threadObj[t].taos);
|
||||||
|
}
|
||||||
|
free(threadObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
void shellCheck(TAOS *con, SShellArguments *args) {
|
||||||
|
int64_t start = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (shellUseDb(con, args->database) != 0) {
|
||||||
|
shellFreeTbnames();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shellShowTables(con, args->database) != 0) {
|
||||||
|
shellFreeTbnames();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, args->threadNum);
|
||||||
|
shellRunCheckThreads(con, args);
|
||||||
|
|
||||||
|
int64_t end = taosGetTimestampMs();
|
||||||
|
fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum,
|
||||||
|
(end - start) / 1000.0);
|
||||||
|
}
|
|
@ -121,12 +121,17 @@ TAOS *shellInit(SShellArguments *args) {
|
||||||
taos_close(con);
|
taos_close(con);
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (args->check != 0) {
|
||||||
|
shellCheck(con, args);
|
||||||
|
taos_close(con);
|
||||||
|
exit(EXIT_SUCCESS);
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return con;
|
return con;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static bool isEmptyCommand(const char* cmd) {
|
static bool isEmptyCommand(const char* cmd) {
|
||||||
for (char c = *cmd++; c != 0; c = *cmd++) {
|
for (char c = *cmd++; c != 0; c = *cmd++) {
|
||||||
if (c != ' ' && c != '\t' && c != ';') {
|
if (c != ' ' && c != '\t' && c != ';') {
|
||||||
|
@ -412,7 +417,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
if (tt < 0) tt = 0;
|
if (tt < 0) tt = 0;
|
||||||
#endif
|
#endif
|
||||||
if (tt < 0 && ms != 0) {
|
if (tt <= 0 && ms != 0) {
|
||||||
tt--;
|
tt--;
|
||||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
ms += 1000000;
|
ms += 1000000;
|
||||||
|
|
|
@ -45,6 +45,7 @@ static struct argp_option options[] = {
|
||||||
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
||||||
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
||||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||||
|
{"check", 'k', "CHECK", 0, "Check tables."},
|
||||||
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
||||||
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
||||||
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."},
|
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."},
|
||||||
|
@ -130,6 +131,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case 'k':
|
||||||
|
arguments->check = atoi(arg);
|
||||||
|
break;
|
||||||
case 'd':
|
case 'd':
|
||||||
arguments->database = arg;
|
arguments->database = arg;
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue