commit
7729fa5334
|
@ -5,7 +5,8 @@
|
|||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"thread_count": 2,
|
||||
"thread_count": 4,
|
||||
"thread_count_create_tbl": 1,
|
||||
"result_file": "./insert_res.txt",
|
||||
"databases": [{
|
||||
"dbinfo": {
|
||||
|
|
|
@ -93,9 +93,6 @@ extern char configDir[];
|
|||
#define MAX_QUERY_SQL_COUNT 10
|
||||
#define MAX_QUERY_SQL_LENGTH 256
|
||||
|
||||
|
||||
#define MAX_LINE_COUNT_IN_MEM 10000
|
||||
|
||||
typedef enum CREATE_SUB_TALBE_MOD_EN {
|
||||
PRE_CREATE_SUBTBL,
|
||||
AUTO_CREATE_SUBTBL,
|
||||
|
@ -259,6 +256,7 @@ typedef struct SDbs_S {
|
|||
bool queryMode;
|
||||
|
||||
int threadCount;
|
||||
int threadCountByCreateTbl;
|
||||
int dbCount;
|
||||
SDataBase db[MAX_DB_COUNT];
|
||||
|
||||
|
@ -1418,7 +1416,6 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
|
|||
char command[BUFFER_SIZE] = "\0";
|
||||
TAOS_RES * res;
|
||||
TAOS_ROW row = NULL;
|
||||
int count = 0;
|
||||
|
||||
char* childTblName = *childTblNameOfSuperTbl;
|
||||
|
||||
|
@ -1433,12 +1430,13 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
|
|||
exit(-1);
|
||||
}
|
||||
|
||||
int childTblCount = 10000;
|
||||
count = 0;
|
||||
int childTblCount = 10000;
|
||||
int count = 0;
|
||||
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
|
||||
char* pTblName = childTblName;
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
strncpy(pTblName, (char *)row[0], TSDB_TABLE_NAME_LEN);
|
||||
int32_t* len = taos_fetch_lengths(res);
|
||||
strncpy(pTblName, (char *)row[0], len[0]);
|
||||
//printf("==== sub table name: %s\n", pTblName);
|
||||
count++;
|
||||
if (count >= childTblCount - 1) {
|
||||
|
@ -1829,38 +1827,64 @@ static void createChildTables() {
|
|||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCount, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCountByCreateTbl, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
static int taosGetLineNum(const char *fileName)
|
||||
{
|
||||
int lineNum = 0;
|
||||
char cmd[1024] = { 0 };
|
||||
char buf[1024] = { 0 };
|
||||
sprintf(cmd, "wc -l %s", fileName);
|
||||
|
||||
FILE *fp = popen(cmd, "r");
|
||||
if (fp == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
|
||||
return lineNum;
|
||||
}
|
||||
|
||||
if (fgets(buf, sizeof(buf), fp)) {
|
||||
int index = strchr((const char*)buf, ' ') - buf;
|
||||
buf[index] = '\0';
|
||||
lineNum = atoi(buf);
|
||||
}
|
||||
pclose(fp);
|
||||
return lineNum;
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
Read 10000 lines at most. If more than 10000 lines, continue to read after using
|
||||
*/
|
||||
int readTagFromCsvFileToMem(SSuperTable * supterTblInfo) {
|
||||
int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
|
||||
size_t n = 0;
|
||||
ssize_t readLen = 0;
|
||||
char * line = NULL;
|
||||
|
||||
FILE *fp = fopen(supterTblInfo->tagsFile, "r");
|
||||
FILE *fp = fopen(superTblInfo->tagsFile, "r");
|
||||
if (fp == NULL) {
|
||||
printf("Failed to open tags file: %s, reason:%s\n", supterTblInfo->tagsFile, strerror(errno));
|
||||
printf("Failed to open tags file: %s, reason:%s\n", superTblInfo->tagsFile, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (supterTblInfo->tagDataBuf) {
|
||||
free(supterTblInfo->tagDataBuf);
|
||||
supterTblInfo->tagDataBuf = NULL;
|
||||
if (superTblInfo->tagDataBuf) {
|
||||
free(superTblInfo->tagDataBuf);
|
||||
superTblInfo->tagDataBuf = NULL;
|
||||
}
|
||||
|
||||
supterTblInfo->tagDataBuf = calloc(supterTblInfo->lenOfTagOfOneRow * MAX_LINE_COUNT_IN_MEM, 1);
|
||||
if (supterTblInfo->tagDataBuf == NULL) {
|
||||
|
||||
int tagCount = 10000;
|
||||
int count = 0;
|
||||
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
|
||||
if (tagDataBuf == NULL) {
|
||||
printf("Failed to calloc, reason:%s\n", strerror(errno));
|
||||
fclose(fp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
while ((readLen = getline(&line, &n, fp)) != -1) {
|
||||
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
|
||||
line[--readLen] = 0;
|
||||
|
@ -1870,20 +1894,35 @@ int readTagFromCsvFileToMem(SSuperTable * supterTblInfo) {
|
|||
continue;
|
||||
}
|
||||
|
||||
memcpy(supterTblInfo->tagDataBuf + supterTblInfo->tagSampleCount * supterTblInfo->lenOfTagOfOneRow, line, readLen);
|
||||
supterTblInfo->tagSampleCount++;
|
||||
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen);
|
||||
count++;
|
||||
|
||||
if (supterTblInfo->tagSampleCount >= MAX_LINE_COUNT_IN_MEM) {
|
||||
break;
|
||||
if (count >= tagCount - 1) {
|
||||
char *tmp = realloc(tagDataBuf, (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
|
||||
if (tmp != NULL) {
|
||||
tagDataBuf = tmp;
|
||||
tagCount = (int)(tagCount*1.5);
|
||||
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
|
||||
} else {
|
||||
// exit, if allocate more memory failed
|
||||
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
|
||||
tmfree(tagDataBuf);
|
||||
free(line);
|
||||
fclose(fp);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
superTblInfo->tagDataBuf = tagDataBuf;
|
||||
superTblInfo->tagSampleCount = count;
|
||||
|
||||
free(line);
|
||||
fclose(fp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int readSampleFromJsonFileToMem(SSuperTable * supterTblInfo) {
|
||||
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
@ -2138,6 +2177,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
printf("failed to read json, threads not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
|
||||
if (threads2 && threads2->type == cJSON_Number) {
|
||||
g_Dbs.threadCountByCreateTbl = threads2->valueint;
|
||||
} else if (!threads2) {
|
||||
g_Dbs.threadCountByCreateTbl = 1;
|
||||
} else {
|
||||
printf("failed to read json, threads2 not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
|
||||
if (!dbs || dbs->type != cJSON_Array) {
|
||||
|
@ -3008,6 +3057,10 @@ void postFreeResource() {
|
|||
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
|
||||
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
|
||||
}
|
||||
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
|
||||
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
|
||||
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
|
||||
}
|
||||
if (0 != g_Dbs.db[i].superTbls[j].childTblName) {
|
||||
free(g_Dbs.db[i].superTbls[j].childTblName);
|
||||
g_Dbs.db[i].superTbls[j].childTblName = NULL;
|
||||
|
|
Loading…
Reference in New Issue