commit
297ec49aa3
|
@ -698,79 +698,97 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
|
|||
|
||||
sprintf(tmpCommand, "select tbname from %s", metric);
|
||||
|
||||
TAOS_RES *result = taos_query(taosCon, tmpCommand);
|
||||
int32_t code = taos_errno(result);
|
||||
TAOS_RES *res = taos_query(taosCon, tmpCommand);
|
||||
int32_t code = taos_errno(res);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to run command %s\n", tmpCommand);
|
||||
free(tmpCommand);
|
||||
taos_free_result(result);
|
||||
taos_free_result(res);
|
||||
return -1;
|
||||
}
|
||||
free(tmpCommand);
|
||||
|
||||
char tmpBuf[TSDB_FILENAME_LEN + 1];
|
||||
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (fd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||
taos_free_result(res);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int table_batch = arguments->table_batch;
|
||||
int affectdRows = taos_affected_rows(result);
|
||||
if (affectdRows <= 0) {
|
||||
free(tmpCommand);
|
||||
taos_free_result(result);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int maxNumOfThread = affectdRows / table_batch + 1;
|
||||
if (maxNumOfThread > 2 * g_numOfCores) {
|
||||
maxNumOfThread = 2 * g_numOfCores;
|
||||
}
|
||||
|
||||
table_batch = affectdRows / maxNumOfThread + 1;
|
||||
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
||||
int32_t numOfTable = 0;
|
||||
int32_t numOfThread = *totalNumOfThread;
|
||||
char tmpFileName[TSDB_FILENAME_LEN + 1];
|
||||
while ((row = taos_fetch_row(result)) != NULL) {
|
||||
if (0 == numOfTable) {
|
||||
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread);
|
||||
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (fd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
|
||||
taos_free_result(result);
|
||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
|
||||
(void)remove(tmpFileName);
|
||||
}
|
||||
free(tmpCommand);
|
||||
return -1;
|
||||
}
|
||||
|
||||
numOfThread++;
|
||||
}
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
|
||||
int32_t numOfTable = 0;
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
|
||||
memset(&tableRecord, 0, sizeof(STableRecord));
|
||||
tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
|
||||
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
||||
|
||||
|
||||
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
||||
numOfTable++;
|
||||
|
||||
if (numOfTable >= table_batch) {
|
||||
numOfTable = 0;
|
||||
close(fd);
|
||||
fd = -1;
|
||||
}
|
||||
}
|
||||
taos_free_result(res);
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
|
||||
int maxThreads = arguments->thread_num;
|
||||
int tableOfPerFile ;
|
||||
if (numOfTable <= arguments->thread_num) {
|
||||
tableOfPerFile = 1;
|
||||
maxThreads = numOfTable;
|
||||
} else {
|
||||
tableOfPerFile = numOfTable / arguments->thread_num;
|
||||
if (0 != numOfTable % arguments->thread_num) {
|
||||
tableOfPerFile += 1;
|
||||
}
|
||||
}
|
||||
|
||||
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
|
||||
if (NULL == tblBuf){
|
||||
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfThread = *totalNumOfThread;
|
||||
int subFd = -1;
|
||||
for (; numOfThread < maxThreads; numOfThread++) {
|
||||
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
|
||||
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (subFd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||
(void)remove(tmpBuf);
|
||||
}
|
||||
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||
(void)remove(tmpBuf);
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// read tableOfPerFile for fd, write to subFd
|
||||
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
|
||||
if (readLen <= 0) {
|
||||
close(subFd);
|
||||
break;
|
||||
}
|
||||
taosWrite(subFd, tblBuf, readLen);
|
||||
close(subFd);
|
||||
}
|
||||
|
||||
sprintf(tmpBuf, ".select-tbname.tmp");
|
||||
(void)remove(tmpBuf);
|
||||
|
||||
if (fd >= 0) {
|
||||
close(fd);
|
||||
fd = -1;
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
}
|
||||
|
||||
*totalNumOfThread = numOfThread;
|
||||
|
||||
free(tmpCommand);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1443,57 +1461,80 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
|
|||
return -1;
|
||||
}
|
||||
|
||||
int table_batch = arguments->table_batch;
|
||||
int affectdRows = taos_affected_rows(res);
|
||||
if (affectdRows <= 0) {
|
||||
char tmpBuf[TSDB_FILENAME_LEN + 1];
|
||||
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpBuf, ".show-tables.tmp");
|
||||
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (fd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||
taos_free_result(res);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int maxNumOfThread = affectdRows / table_batch + 1;
|
||||
if (maxNumOfThread > 2 * g_numOfCores) {
|
||||
maxNumOfThread = 2 * g_numOfCores;
|
||||
}
|
||||
|
||||
table_batch = affectdRows / maxNumOfThread + 1;
|
||||
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
|
||||
int32_t numOfTable = 0;
|
||||
int32_t numOfThread = 0;
|
||||
char tmpBuf[TSDB_FILENAME_LEN + 1];
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
if (0 == numOfTable) {
|
||||
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
|
||||
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (fd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||
taos_free_result(res);
|
||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||
(void)remove(tmpBuf);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
numOfThread++;
|
||||
}
|
||||
|
||||
int32_t numOfTable = 0;
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
memset(&tableRecord, 0, sizeof(STableRecord));
|
||||
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
|
||||
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
|
||||
|
||||
|
||||
taosWrite(fd, &tableRecord, sizeof(STableRecord));
|
||||
|
||||
|
||||
numOfTable++;
|
||||
|
||||
if (numOfTable >= table_batch) {
|
||||
numOfTable = 0;
|
||||
close(fd);
|
||||
fd = -1;
|
||||
}
|
||||
}
|
||||
taos_free_result(res);
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
|
||||
int maxThreads = tsArguments.thread_num;
|
||||
int tableOfPerFile ;
|
||||
if (numOfTable <= tsArguments.thread_num) {
|
||||
tableOfPerFile = 1;
|
||||
maxThreads = numOfTable;
|
||||
} else {
|
||||
tableOfPerFile = numOfTable / tsArguments.thread_num;
|
||||
if (0 != numOfTable % tsArguments.thread_num) {
|
||||
tableOfPerFile += 1;
|
||||
}
|
||||
}
|
||||
|
||||
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
|
||||
if (NULL == tblBuf){
|
||||
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfThread = 0;
|
||||
int subFd = -1;
|
||||
for (numOfThread = 0; numOfThread < maxThreads; numOfThread++) {
|
||||
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
|
||||
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
|
||||
if (subFd == -1) {
|
||||
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
|
||||
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
|
||||
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
|
||||
(void)remove(tmpBuf);
|
||||
}
|
||||
sprintf(tmpBuf, ".show-tables.tmp");
|
||||
(void)remove(tmpBuf);
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// read tableOfPerFile for fd, write to subFd
|
||||
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
|
||||
if (readLen <= 0) {
|
||||
close(subFd);
|
||||
break;
|
||||
}
|
||||
taosWrite(subFd, tblBuf, readLen);
|
||||
close(subFd);
|
||||
}
|
||||
|
||||
sprintf(tmpBuf, ".show-tables.tmp");
|
||||
(void)remove(tmpBuf);
|
||||
|
||||
if (fd >= 0) {
|
||||
close(fd);
|
||||
|
|
Loading…
Reference in New Issue