commit
682bcb2d6d
|
@ -469,6 +469,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
|
|||
|
||||
if (pCmd->command > TSDB_SQL_MGMT) {
|
||||
tscProcessMgmtRedirect(pSql, pMsg->content + 1);
|
||||
} else if (pCmd->command == TSDB_SQL_INSERT){
|
||||
pSql->index++;
|
||||
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
|
||||
} else {
|
||||
pSql->index++;
|
||||
}
|
||||
|
@ -1400,7 +1403,7 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
|
|||
|
||||
pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
|
||||
tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode));
|
||||
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), htons(pShellMsg->vnode));
|
||||
}
|
||||
|
||||
int tscBuildSubmitMsg(SSqlObj *pSql) {
|
||||
|
@ -1421,7 +1424,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
|
|||
|
||||
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode));
|
||||
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode));
|
||||
|
||||
return msgLen;
|
||||
}
|
||||
|
|
|
@ -58,6 +58,8 @@ struct arguments {
|
|||
bool is_raw_time;
|
||||
bool is_use_passwd;
|
||||
char file[TSDB_FILENAME_LEN];
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
int threadNum;
|
||||
char* commands;
|
||||
int abort;
|
||||
};
|
||||
|
@ -74,12 +76,14 @@ void shellRunCommandOnServer(TAOS* con, char command[]);
|
|||
void read_history();
|
||||
void write_history();
|
||||
void source_file(TAOS* con, char* fptr);
|
||||
void source_dir(TAOS* con, struct arguments* args);
|
||||
void get_history_path(char* history);
|
||||
void cleanup_handler(void* arg);
|
||||
void exitShell();
|
||||
int shellDumpResult(TAOS* con, char* fname, int* error_no, bool printMode);
|
||||
void shellPrintNChar(char* str, int width, bool printMode);
|
||||
void shellGetGrantInfo(void *con);
|
||||
int isCommentLine(char *line);
|
||||
#define max(a, b) ((int)(a) < (int)(b) ? (int)(b) : (int)(a))
|
||||
|
||||
/**************** Global variable declarations ****************/
|
||||
|
|
|
@ -111,6 +111,14 @@ TAOS *shellInit(struct arguments *args) {
|
|||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
#ifdef LINUX
|
||||
if (args->dir[0] != 0) {
|
||||
source_dir(con, args);
|
||||
taos_close(con);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
#endif
|
||||
|
||||
printf(SERVER_VERSION, taos_get_server_info(con));
|
||||
|
||||
return con;
|
||||
|
@ -765,7 +773,7 @@ void taos_error(TAOS *con) {
|
|||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
static int isCommentLine(char *line) {
|
||||
int isCommentLine(char *line) {
|
||||
if (line == NULL) return 1;
|
||||
|
||||
return regex_match(line, "^\\s*#.*", REG_EXTENDED);
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _XOPEN_SOURCE
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
#include "shell.h"
|
||||
#include "shellCommand.h"
|
||||
#include "ttime.h"
|
||||
#include "tutil.h"
|
||||
|
||||
static char **shellSQLFiles = NULL;
|
||||
static int32_t shellSQLFileNum = 0;
|
||||
static char shellTablesSQLFile[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
typedef struct {
|
||||
pthread_t threadID;
|
||||
int threadIndex;
|
||||
int totalThreads;
|
||||
void *taos;
|
||||
} ShellThreadObj;
|
||||
|
||||
static int shellGetFilesNum(const char *directoryName, const char *prefix)
|
||||
{
|
||||
char cmd[1024] = { 0 };
|
||||
sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix);
|
||||
|
||||
FILE *fp = popen(cmd, "r");
|
||||
if (fp == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
int fileNum = 0;
|
||||
if (fscanf(fp, "%d", &fileNum) != 1) {
|
||||
fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
if (fileNum <= 0) {
|
||||
fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
pclose(fp);
|
||||
return fileNum;
|
||||
}
|
||||
|
||||
static void shellParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles)
|
||||
{
|
||||
char cmd[1024] = { 0 };
|
||||
sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix);
|
||||
|
||||
FILE *fp = popen(cmd, "r");
|
||||
if (fp == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
int fileNum = 0;
|
||||
while (fscanf(fp, "%s", fileArray[fileNum++])) {
|
||||
if (strcmp(fileArray[fileNum-1], shellTablesSQLFile) == 0) {
|
||||
fileNum--;
|
||||
}
|
||||
if (fileNum >= totalFiles) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fileNum != totalFiles) {
|
||||
fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
pclose(fp);
|
||||
}
|
||||
|
||||
static void shellCheckTablesSQLFile(const char *directoryName)
|
||||
{
|
||||
char cmd[1024] = { 0 };
|
||||
sprintf(cmd, "ls %s/tables.sql", directoryName);
|
||||
|
||||
FILE *fp = popen(cmd, "r");
|
||||
if (fp == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
while (fscanf(fp, "%s", shellTablesSQLFile)) {
|
||||
break;
|
||||
}
|
||||
|
||||
pclose(fp);
|
||||
}
|
||||
|
||||
static void shellMallocSQLFiles()
|
||||
{
|
||||
shellSQLFiles = (char**)calloc(shellSQLFileNum, sizeof(char*));
|
||||
for (int i = 0; i < shellSQLFileNum; i++) {
|
||||
shellSQLFiles[i] = calloc(1, TSDB_FILENAME_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
static void shellGetDirectoryFileList(char *inputDir)
|
||||
{
|
||||
struct stat fileStat;
|
||||
if (stat(inputDir, &fileStat) < 0) {
|
||||
fprintf(stderr, "ERROR: %s not exist\n", inputDir);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
if (fileStat.st_mode & S_IFDIR) {
|
||||
shellCheckTablesSQLFile(inputDir);
|
||||
shellSQLFileNum = shellGetFilesNum(inputDir, "sql");
|
||||
int totalSQLFileNum = shellSQLFileNum;
|
||||
if (shellTablesSQLFile[0] != 0) {
|
||||
shellSQLFileNum--;
|
||||
}
|
||||
shellMallocSQLFiles();
|
||||
shellParseDirectory(inputDir, "sql", shellSQLFiles, shellSQLFileNum);
|
||||
fprintf(stdout, "\nstart to dispose %d files in %s\n", totalSQLFileNum, inputDir);
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "ERROR: %s is not a directory\n", inputDir);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void shellSourceFile(TAOS *con, char *fptr) {
|
||||
wordexp_t full_path;
|
||||
int read_len = 0;
|
||||
char * cmd = malloc(MAX_COMMAND_SIZE);
|
||||
size_t cmd_len = 0;
|
||||
char * line = NULL;
|
||||
size_t line_len = 0;
|
||||
|
||||
if (wordexp(fptr, &full_path, 0) != 0) {
|
||||
fprintf(stderr, "ERROR: illegal file name\n");
|
||||
return;
|
||||
}
|
||||
|
||||
char *fname = full_path.we_wordv[0];
|
||||
|
||||
if (access(fname, R_OK) == -1) {
|
||||
fprintf(stderr, "ERROR: file %s is not readable\n", fptr);
|
||||
wordfree(&full_path);
|
||||
return;
|
||||
}
|
||||
|
||||
FILE *f = fopen(fname, "r");
|
||||
if (f == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
|
||||
wordfree(&full_path);
|
||||
return;
|
||||
}
|
||||
|
||||
fprintf(stdout, "begin import file:%s\n", fname);
|
||||
|
||||
int lineNo = 0;
|
||||
while ((read_len = getline(&line, &line_len, f)) != -1) {
|
||||
++lineNo;
|
||||
if (read_len >= MAX_COMMAND_SIZE) continue;
|
||||
line[--read_len] = '\0';
|
||||
|
||||
if (read_len == 0 || isCommentLine(line)) { // line starts with #
|
||||
continue;
|
||||
}
|
||||
|
||||
if (line[read_len - 1] == '\\') {
|
||||
line[read_len - 1] = ' ';
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
cmd_len += read_len;
|
||||
continue;
|
||||
}
|
||||
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
if (taos_query(con, cmd)) {
|
||||
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
|
||||
/* free local resouce: allocated memory/metric-meta refcnt */
|
||||
TAOS_RES *pRes = taos_use_result(con);
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
memset(cmd, 0, MAX_COMMAND_SIZE);
|
||||
cmd_len = 0;
|
||||
}
|
||||
|
||||
free(cmd);
|
||||
if (line) free(line);
|
||||
wordfree(&full_path);
|
||||
fclose(f);
|
||||
}
|
||||
|
||||
void* shellImportThreadFp(void *arg)
|
||||
{
|
||||
ShellThreadObj *pThread = (ShellThreadObj*)arg;
|
||||
for (int f = 0; f < shellSQLFileNum; ++f) {
|
||||
if (f % pThread->totalThreads == pThread->threadIndex) {
|
||||
char *SQLFileName = shellSQLFiles[f];
|
||||
shellSourceFile(pThread->taos, SQLFileName);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void shellRunImportThreads(struct arguments* args)
|
||||
{
|
||||
pthread_attr_t thattr;
|
||||
ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj));
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
ShellThreadObj *pThread = threadObj + t;
|
||||
pThread->threadIndex = t;
|
||||
pThread->totalThreads = args->threadNum;
|
||||
pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort);
|
||||
if (pThread->taos == NULL) {
|
||||
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&(pThread->threadID), &thattr, shellImportThreadFp, (void*)pThread) != 0) {
|
||||
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
pthread_join(threadObj[t].threadID, NULL);
|
||||
}
|
||||
|
||||
for (int t = 0; t < args->threadNum; ++t) {
|
||||
taos_close(threadObj[t].taos);
|
||||
}
|
||||
free(threadObj);
|
||||
}
|
||||
|
||||
void source_dir(TAOS* con, struct arguments* args) {
|
||||
shellGetDirectoryFileList(args->dir);
|
||||
int64_t start = taosGetTimestampMs();
|
||||
|
||||
if (shellTablesSQLFile[0] != 0) {
|
||||
shellSourceFile(con, shellTablesSQLFile);
|
||||
int64_t end = taosGetTimestampMs();
|
||||
fprintf(stdout, "import %s finished, time spent %.2f seconds\n", shellTablesSQLFile, (end - start) / 1000.0);
|
||||
}
|
||||
|
||||
shellRunImportThreads(args);
|
||||
int64_t end = taosGetTimestampMs();
|
||||
fprintf(stdout, "import %s finished, time spent %.2f seconds\n", args->dir, (end - start) / 1000.0);
|
||||
}
|
|
@ -41,6 +41,8 @@ static struct argp_option options[] = {
|
|||
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
|
||||
{"raw-time", 'r', 0, 0, "Output time as uint64_t."},
|
||||
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
||||
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
||||
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
||||
{0}};
|
||||
|
@ -90,6 +92,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
|||
strcpy(arguments->file, full_path.we_wordv[0]);
|
||||
wordfree(&full_path);
|
||||
break;
|
||||
case 'D':
|
||||
if (wordexp(arg, &full_path, 0) != 0) {
|
||||
fprintf(stderr, "Invalid path %s\n", arg);
|
||||
return -1;
|
||||
}
|
||||
strcpy(arguments->dir, full_path.we_wordv[0]);
|
||||
wordfree(&full_path);
|
||||
break;
|
||||
case 'T':
|
||||
arguments->threadNum = atoi(arg);
|
||||
break;
|
||||
case 'd':
|
||||
arguments->database = arg;
|
||||
break;
|
||||
|
|
|
@ -62,7 +62,19 @@ int checkVersion() {
|
|||
}
|
||||
|
||||
// Global configurations
|
||||
struct arguments args = {NULL, NULL, NULL, NULL, NULL, false, false, "\0", NULL};
|
||||
struct arguments args = {
|
||||
.host = NULL,
|
||||
.password = NULL,
|
||||
.user = NULL,
|
||||
.database = NULL,
|
||||
.timezone = NULL,
|
||||
.is_raw_time = false,
|
||||
.is_use_passwd = false,
|
||||
.file = "\0",
|
||||
.dir = "\0",
|
||||
.threadNum = 5,
|
||||
.commands = NULL
|
||||
};
|
||||
|
||||
/*
|
||||
* Main function.
|
||||
|
|
|
@ -48,7 +48,7 @@ int mgmtGetConns(SShowObj *pShow, SConnObj *pConn) {
|
|||
pConn = pAcct->pConn;
|
||||
SConnInfo *pConnInfo = pConnShow->connInfo;
|
||||
|
||||
while (pConn) {
|
||||
while (pConn && pConn->pUser) {
|
||||
strcpy(pConnInfo->user, pConn->pUser->user);
|
||||
pConnInfo->ip = pConn->ip;
|
||||
pConnInfo->port = pConn->port;
|
||||
|
|
|
@ -40,6 +40,7 @@ void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssiz
|
|||
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize);
|
||||
void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize);
|
||||
bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode);
|
||||
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode);
|
||||
|
||||
void mgmtVgroupActionInit() {
|
||||
mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert;
|
||||
|
@ -333,8 +334,8 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
|
|||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
if (pVgroup->vnodeGid[i].ip != 0) {
|
||||
bool ready = mgmtCheckVnodeReady(NULL, pVgroup, pVgroup->vnodeGid + i);
|
||||
strcpy(pWrite, ready ? "ready" : "unsynced");
|
||||
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
|
||||
strcpy(pWrite, vnodeStatus);
|
||||
} else {
|
||||
strcpy(pWrite, "null");
|
||||
}
|
||||
|
|
|
@ -117,6 +117,9 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) {
|
||||
vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
|
||||
} else if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_REDIRECT);
|
||||
dTrace("vid:%d sid:%d, shell submit msg is redirect since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
} else {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
|
|
|
@ -52,6 +52,8 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) {
|
|||
return tsModule[moduleType].num != 0;
|
||||
}
|
||||
|
||||
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; }
|
||||
|
||||
bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; }
|
||||
|
||||
void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbStatus) {}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
char version[64] = "1.6.4.2";
|
||||
char version[64] = "1.6.4.4";
|
||||
char compatible_version[64] = "1.6.1.0";
|
||||
char gitinfo[128] = "b9a62d60dc1d4a41452a9bc94e3a0924485c3a75";
|
||||
char gitinfoOfInternal[128] = "e6445addc77e8c96dcb57221fa6ab5dcde0458f7";
|
||||
char buildinfo[512] = "Built by root at 2019-12-10 10:31";
|
||||
char gitinfo[128] = "d62c5c30231d04a736d437cf428af6e12599bd9f";
|
||||
char gitinfoOfInternal[128] = "8094a32d78dc519bd883d01ac2ba6ec49ac57a80";
|
||||
char buildinfo[512] = "Built by ubuntu at 2019-12-16 21:40";
|
||||
|
|
Loading…
Reference in New Issue