[TD-2292]
This commit is contained in:
parent
02b5044788
commit
d70ca0b133
|
@ -14,6 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <iconv.h>
|
#include <iconv.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/syscall.h>
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
@ -366,6 +369,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
static struct argp argp = {options, parse_opt, args_doc, doc};
|
static struct argp argp = {options, parse_opt, args_doc, doc};
|
||||||
static resultStatistics g_resultStatistics = {0};
|
static resultStatistics g_resultStatistics = {0};
|
||||||
static FILE *g_fpOfResult = NULL;
|
static FILE *g_fpOfResult = NULL;
|
||||||
|
static int g_numOfCores = 1;
|
||||||
|
|
||||||
int taosDumpOut(struct arguments *arguments);
|
int taosDumpOut(struct arguments *arguments);
|
||||||
int taosDumpIn(struct arguments *arguments);
|
int taosDumpIn(struct arguments *arguments);
|
||||||
|
@ -378,7 +382,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
|
||||||
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName);
|
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName);
|
||||||
int taosCheckParam(struct arguments *arguments);
|
int taosCheckParam(struct arguments *arguments);
|
||||||
void taosFreeDbInfos();
|
void taosFreeDbInfos();
|
||||||
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName);
|
static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName);
|
||||||
|
|
||||||
struct arguments tsArguments = {
|
struct arguments tsArguments = {
|
||||||
// connection option
|
// connection option
|
||||||
|
@ -540,6 +544,8 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN);
|
||||||
|
|
||||||
time_t tTime = time(NULL);
|
time_t tTime = time(NULL);
|
||||||
struct tm tm = *localtime(&tTime);
|
struct tm tm = *localtime(&tTime);
|
||||||
|
|
||||||
|
@ -692,64 +698,97 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
|
||||||
|
|
||||||
sprintf(tmpCommand, "select tbname from %s", metric);
|
sprintf(tmpCommand, "select tbname from %s", metric);
|
||||||
|
|
||||||
TAOS_RES *result = taos_query(taosCon, tmpCommand);
|
TAOS_RES *res = taos_query(taosCon, tmpCommand);
|
||||||
int32_t code = taos_errno(result);
|
int32_t code = taos_errno(res);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "failed to run command %s\n", tmpCommand);
|
fprintf(stderr, "failed to run command %s\n", tmpCommand);
|
||||||
free(tmpCommand);
|
free(tmpCommand);
|
||||||
taos_free_result(result);
|
taos_free_result(res);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
free(tmpCommand);
|
||||||
|
|
||||||
|
char tmpBuf[TSDB_FILENAME_LEN + 1];
|
||||||
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||||
|
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||||
|
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||||
|
if (fd == -1) {
|
||||||
|
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||||
|
taos_free_result(res);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||||
|
|
||||||
int32_t numOfTable = 0;
|
|
||||||
int32_t numOfThread = *totalNumOfThread;
|
|
||||||
char tmpFileName[TSDB_FILENAME_LEN + 1];
|
|
||||||
while ((row = taos_fetch_row(result)) != NULL) {
|
|
||||||
if (0 == numOfTable) {
|
|
||||||
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
|
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread);
|
|
||||||
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
|
||||||
if (fd == -1) {
|
|
||||||
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
|
|
||||||
taos_free_result(result);
|
|
||||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
|
|
||||||
(void)remove(tmpFileName);
|
|
||||||
}
|
|
||||||
free(tmpCommand);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfThread++;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
int32_t numOfTable = 0;
|
||||||
|
while ((row = taos_fetch_row(res)) != NULL) {
|
||||||
|
|
||||||
memset(&tableRecord, 0, sizeof(STableRecord));
|
memset(&tableRecord, 0, sizeof(STableRecord));
|
||||||
tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
|
tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
|
||||||
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
||||||
|
|
||||||
numOfTable++;
|
numOfTable++;
|
||||||
|
|
||||||
if (numOfTable >= arguments->table_batch) {
|
|
||||||
numOfTable = 0;
|
|
||||||
close(fd);
|
|
||||||
fd = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
lseek(fd, 0, SEEK_SET);
|
||||||
|
|
||||||
|
int maxThreads = arguments->thread_num;
|
||||||
|
int tableOfPerFile ;
|
||||||
|
if (numOfTable <= arguments->thread_num) {
|
||||||
|
tableOfPerFile = 1;
|
||||||
|
maxThreads = numOfTable;
|
||||||
|
} else {
|
||||||
|
tableOfPerFile = numOfTable / arguments->thread_num;
|
||||||
|
if (0 != numOfTable % arguments->thread_num) {
|
||||||
|
tableOfPerFile += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
|
||||||
|
if (NULL == tblBuf){
|
||||||
|
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfThread = *totalNumOfThread;
|
||||||
|
int subFd = -1;
|
||||||
|
for (; numOfThread < maxThreads; numOfThread++) {
|
||||||
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||||
|
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
|
||||||
|
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||||
|
if (subFd == -1) {
|
||||||
|
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||||
|
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||||
|
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
}
|
||||||
|
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read tableOfPerFile for fd, write to subFd
|
||||||
|
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
|
||||||
|
if (readLen <= 0) {
|
||||||
|
close(subFd);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosWrite(subFd, tblBuf, readLen);
|
||||||
|
close(subFd);
|
||||||
|
}
|
||||||
|
|
||||||
|
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
|
||||||
if (fd >= 0) {
|
if (fd >= 0) {
|
||||||
close(fd);
|
close(fd);
|
||||||
fd = -1;
|
fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
|
||||||
|
|
||||||
*totalNumOfThread = numOfThread;
|
*totalNumOfThread = numOfThread;
|
||||||
|
|
||||||
free(tmpCommand);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -946,7 +985,7 @@ int taosDumpOut(struct arguments *arguments) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start multi threads to dumpout
|
// start multi threads to dumpout
|
||||||
taosStartDumpOutWorkThreads(arguments, totalNumOfThread, dbInfos[0]->name);
|
taosStartDumpOutWorkThreads(taos, arguments, totalNumOfThread, dbInfos[0]->name);
|
||||||
|
|
||||||
char tmpFileName[TSDB_FILENAME_LEN + 1];
|
char tmpFileName[TSDB_FILENAME_LEN + 1];
|
||||||
_clean_tmp_file:
|
_clean_tmp_file:
|
||||||
|
@ -1181,34 +1220,34 @@ void* taosDumpOutWorkThreadFp(void *arg)
|
||||||
STableRecord tableRecord;
|
STableRecord tableRecord;
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
char tmpFileName[TSDB_FILENAME_LEN*4] = {0};
|
char tmpBuf[TSDB_FILENAME_LEN*4] = {0};
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex);
|
sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
|
||||||
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||||
if (fd == -1) {
|
if (fd == -1) {
|
||||||
fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpFileName);
|
fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
FILE *fp = NULL;
|
FILE *fp = NULL;
|
||||||
memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128);
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
|
||||||
|
|
||||||
if (tsArguments.outpath[0] != 0) {
|
if (tsArguments.outpath[0] != 0) {
|
||||||
sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
|
sprintf(tmpBuf, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
|
||||||
} else {
|
} else {
|
||||||
sprintf(tmpFileName, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
|
sprintf(tmpBuf, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
fp = fopen(tmpFileName, "w");
|
fp = fopen(tmpBuf, "w");
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
fprintf(stderr, "failed to open file %s\n", tmpFileName);
|
fprintf(stderr, "failed to open file %s\n", tmpBuf);
|
||||||
close(fd);
|
close(fd);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||||
sprintf(tmpFileName, "use %s", pThread->dbName);
|
sprintf(tmpBuf, "use %s", pThread->dbName);
|
||||||
|
|
||||||
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpFileName);
|
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf);
|
||||||
int32_t code = taos_errno(tmpResult);
|
int32_t code = taos_errno(tmpResult);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "invalid database %s\n", pThread->dbName);
|
fprintf(stderr, "invalid database %s\n", pThread->dbName);
|
||||||
|
@ -1218,6 +1257,9 @@ void* taosDumpOutWorkThreadFp(void *arg)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int fileNameIndex = 1;
|
||||||
|
int tablesInOneFile = 0;
|
||||||
|
int64_t lastRowsPrint = 5000000;
|
||||||
fprintf(fp, "USE %s;\n\n", pThread->dbName);
|
fprintf(fp, "USE %s;\n\n", pThread->dbName);
|
||||||
while (1) {
|
while (1) {
|
||||||
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
|
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
|
||||||
|
@ -1228,6 +1270,33 @@ void* taosDumpOutWorkThreadFp(void *arg)
|
||||||
// TODO: sum table count and table rows by self
|
// TODO: sum table count and table rows by self
|
||||||
pThread->tablesOfDumpOut++;
|
pThread->tablesOfDumpOut++;
|
||||||
pThread->rowsOfDumpOut += ret;
|
pThread->rowsOfDumpOut += ret;
|
||||||
|
|
||||||
|
if (pThread->rowsOfDumpOut >= lastRowsPrint) {
|
||||||
|
printf(" %"PRId64 " rows already be dumpout from database %s\n", pThread->rowsOfDumpOut, pThread->dbName);
|
||||||
|
lastRowsPrint += 5000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
tablesInOneFile++;
|
||||||
|
if (tablesInOneFile >= tsArguments.table_batch) {
|
||||||
|
fclose(fp);
|
||||||
|
tablesInOneFile = 0;
|
||||||
|
|
||||||
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
|
||||||
|
if (tsArguments.outpath[0] != 0) {
|
||||||
|
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex, fileNameIndex);
|
||||||
|
} else {
|
||||||
|
sprintf(tmpBuf, "%s.tables.%d-%d.sql", pThread->dbName, pThread->threadIndex, fileNameIndex);
|
||||||
|
}
|
||||||
|
fileNameIndex++;
|
||||||
|
|
||||||
|
fp = fopen(tmpBuf, "w");
|
||||||
|
if (fp == NULL) {
|
||||||
|
fprintf(stderr, "failed to open file %s\n", tmpBuf);
|
||||||
|
close(fd);
|
||||||
|
taos_free_result(tmpResult);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1238,7 +1307,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName)
|
static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName)
|
||||||
{
|
{
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
|
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
|
||||||
|
@ -1249,12 +1318,7 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh
|
||||||
pThread->threadIndex = t;
|
pThread->threadIndex = t;
|
||||||
pThread->totalThreads = numOfThread;
|
pThread->totalThreads = numOfThread;
|
||||||
tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN);
|
tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN);
|
||||||
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port);
|
pThread->taosCon = taosCon;
|
||||||
|
|
||||||
if (pThread->taosCon == NULL) {
|
|
||||||
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL));
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
@ -1273,7 +1337,6 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh
|
||||||
int64_t totalRowsOfDumpOut = 0;
|
int64_t totalRowsOfDumpOut = 0;
|
||||||
int64_t totalChildTblsOfDumpOut = 0;
|
int64_t totalChildTblsOfDumpOut = 0;
|
||||||
for (int32_t t = 0; t < numOfThread; ++t) {
|
for (int32_t t = 0; t < numOfThread; ++t) {
|
||||||
taos_close(threadObj[t].taosCon);
|
|
||||||
totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut;
|
totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut;
|
||||||
totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut;
|
totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut;
|
||||||
}
|
}
|
||||||
|
@ -1398,43 +1461,80 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char tmpBuf[TSDB_FILENAME_LEN + 1];
|
||||||
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||||
|
sprintf(tmpBuf, ".show-tables.tmp");
|
||||||
|
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||||
|
if (fd == -1) {
|
||||||
|
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||||
|
taos_free_result(res);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||||
|
|
||||||
int32_t numOfTable = 0;
|
|
||||||
int32_t numOfThread = 0;
|
|
||||||
char tmpFileName[TSDB_FILENAME_LEN + 1];
|
|
||||||
while ((row = taos_fetch_row(res)) != NULL) {
|
|
||||||
if (0 == numOfTable) {
|
|
||||||
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
|
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread);
|
|
||||||
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
|
||||||
if (fd == -1) {
|
|
||||||
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
|
|
||||||
taos_free_result(res);
|
|
||||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
|
|
||||||
(void)remove(tmpFileName);
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfThread++;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
int32_t numOfTable = 0;
|
||||||
|
while ((row = taos_fetch_row(res)) != NULL) {
|
||||||
memset(&tableRecord, 0, sizeof(STableRecord));
|
memset(&tableRecord, 0, sizeof(STableRecord));
|
||||||
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
||||||
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
||||||
|
|
||||||
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
||||||
|
|
||||||
numOfTable++;
|
numOfTable++;
|
||||||
|
|
||||||
if (numOfTable >= arguments->table_batch) {
|
|
||||||
numOfTable = 0;
|
|
||||||
close(fd);
|
|
||||||
fd = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
lseek(fd, 0, SEEK_SET);
|
||||||
|
|
||||||
|
int maxThreads = tsArguments.thread_num;
|
||||||
|
int tableOfPerFile ;
|
||||||
|
if (numOfTable <= tsArguments.thread_num) {
|
||||||
|
tableOfPerFile = 1;
|
||||||
|
maxThreads = numOfTable;
|
||||||
|
} else {
|
||||||
|
tableOfPerFile = numOfTable / tsArguments.thread_num;
|
||||||
|
if (0 != numOfTable % tsArguments.thread_num) {
|
||||||
|
tableOfPerFile += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
|
||||||
|
if (NULL == tblBuf){
|
||||||
|
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfThread = 0;
|
||||||
|
int subFd = -1;
|
||||||
|
for (numOfThread = 0; numOfThread < maxThreads; numOfThread++) {
|
||||||
|
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||||
|
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
|
||||||
|
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||||
|
if (subFd == -1) {
|
||||||
|
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||||
|
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||||
|
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
}
|
||||||
|
sprintf(tmpBuf, ".show-tables.tmp");
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read tableOfPerFile for fd, write to subFd
|
||||||
|
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
|
||||||
|
if (readLen <= 0) {
|
||||||
|
close(subFd);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosWrite(subFd, tblBuf, readLen);
|
||||||
|
close(subFd);
|
||||||
|
}
|
||||||
|
|
||||||
|
sprintf(tmpBuf, ".show-tables.tmp");
|
||||||
|
(void)remove(tmpBuf);
|
||||||
|
|
||||||
if (fd >= 0) {
|
if (fd >= 0) {
|
||||||
close(fd);
|
close(fd);
|
||||||
|
@ -1444,10 +1544,10 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
|
||||||
// start multi threads to dumpout
|
// start multi threads to dumpout
|
||||||
taosStartDumpOutWorkThreads(arguments, numOfThread, dbInfo->name);
|
taosStartDumpOutWorkThreads(taosCon, arguments, numOfThread, dbInfo->name);
|
||||||
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||||
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
|
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||||
(void)remove(tmpFileName);
|
(void)remove(tmpBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1552,8 +1652,8 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName) {
|
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName) {
|
||||||
/* char temp[MAX_COMMAND_SIZE] = "\0"; */
|
int64_t lastRowsPrint = 5000000;
|
||||||
int64_t totalRows = 0;
|
int64_t totalRows = 0;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
char *pstr = NULL;
|
char *pstr = NULL;
|
||||||
TAOS_ROW row = NULL;
|
TAOS_ROW row = NULL;
|
||||||
|
@ -1680,9 +1780,14 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
|
||||||
|
|
||||||
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ") ");
|
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ") ");
|
||||||
|
|
||||||
totalRows++;
|
totalRows++;
|
||||||
count++;
|
count++;
|
||||||
fprintf(fp, "%s", tmpBuffer);
|
fprintf(fp, "%s", tmpBuffer);
|
||||||
|
|
||||||
|
if (totalRows >= lastRowsPrint) {
|
||||||
|
printf(" %"PRId64 " rows already be dumpout from %s.%s\n", totalRows, dbName, tbname);
|
||||||
|
lastRowsPrint += 5000000;
|
||||||
|
}
|
||||||
|
|
||||||
total_sqlstr_len += curr_sqlstr_len;
|
total_sqlstr_len += curr_sqlstr_len;
|
||||||
|
|
||||||
|
@ -2048,6 +2153,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int lastRowsPrint = 5000000;
|
||||||
int lineNo = 0;
|
int lineNo = 0;
|
||||||
while ((read_len = getline(&line, &line_len, fp)) != -1) {
|
while ((read_len = getline(&line, &line_len, fp)) != -1) {
|
||||||
++lineNo;
|
++lineNo;
|
||||||
|
@ -2074,7 +2180,12 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(cmd, 0, TSDB_MAX_ALLOWED_SQL_LEN);
|
memset(cmd, 0, TSDB_MAX_ALLOWED_SQL_LEN);
|
||||||
cmd_len = 0;
|
cmd_len = 0;
|
||||||
|
|
||||||
|
if (lineNo >= lastRowsPrint) {
|
||||||
|
printf(" %d lines already be executed from file %s\n", lineNo, fileName);
|
||||||
|
lastRowsPrint += 5000000;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(cmd);
|
tfree(cmd);
|
||||||
|
@ -2101,7 +2212,7 @@ void* taosDumpInWorkThreadFp(void *arg)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosStartDumpInWorkThreads(struct arguments *args)
|
static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
|
||||||
{
|
{
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
SThreadParaObj *pThread;
|
SThreadParaObj *pThread;
|
||||||
|
@ -2116,11 +2227,7 @@ static void taosStartDumpInWorkThreads(struct arguments *args)
|
||||||
pThread = threadObj + t;
|
pThread = threadObj + t;
|
||||||
pThread->threadIndex = t;
|
pThread->threadIndex = t;
|
||||||
pThread->totalThreads = totalThreads;
|
pThread->totalThreads = totalThreads;
|
||||||
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port);
|
pThread->taosCon = taosCon;
|
||||||
if (pThread->taosCon == NULL) {
|
|
||||||
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL));
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
@ -2169,7 +2276,7 @@ int taosDumpIn(struct arguments *arguments) {
|
||||||
taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile);
|
taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosStartDumpInWorkThreads(arguments);
|
taosStartDumpInWorkThreads(taos, arguments);
|
||||||
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
taosFreeSQLFiles();
|
taosFreeSQLFiles();
|
||||||
|
|
Loading…
Reference in New Issue