From 83187703a8d4ee3f8d9f0a44eaebc358b2e894bd Mon Sep 17 00:00:00 2001 From: root Date: Fri, 21 May 2021 11:51:33 +0000 Subject: [PATCH 1/5] [TD-4182] specify column for single batch bind --- tests/script/api/stmtBatchTest.c | 975 ++++++++++++++++++++++++++++++- 1 file changed, 950 insertions(+), 25 deletions(-) diff --git a/tests/script/api/stmtBatchTest.c b/tests/script/api/stmtBatchTest.c index 3ce2fefb0d..8bd296db61 100644 --- a/tests/script/api/stmtBatchTest.c +++ b/tests/script/api/stmtBatchTest.c @@ -37,7 +37,7 @@ typedef struct { //void taosMsleep(int mseconds); -int g_rows = 0; +int g_runTimes = 5; unsigned long long getCurrentTime(){ @@ -1336,7 +1336,7 @@ static void checkResult(TAOS *taos, char *tname, int printr, int expected) { result = taos_query(taos, sql); int code = taos_errno(result); if (code != 0) { - printf("failed to query table, reason:%s\n", taos_errstr(result)); + printf("failed to query table: %s, reason:%s\n", tname, taos_errstr(result)); taos_free_result(result); return; } @@ -1358,9 +1358,9 @@ static void checkResult(TAOS *taos, char *tname, int printr, int expected) { } if (rows == expected) { - printf("%d rows are fetched as expectation\n", rows); + printf("%d rows are fetched as expectation from %s\n", rows, tname); } else { - printf("!!!expect %d rows, but %d rows are fetched\n", expected, rows); + printf("!!!expect %d rows, but %d rows are fetched from %s\n", expected, rows, tname); return; } @@ -1451,15 +1451,52 @@ static void prepareV_long(TAOS *taos, int schemaCase, int tableNum, int lenO -//void runcase(TAOS *taos, int idx) { -static void* runCase(void *para) { - ThreadInfo* tInfo = (ThreadInfo *)para; - TAOS *taos = tInfo->taos; - int idx = tInfo->idx; - - TAOS_STMT *stmt = NULL; +static void prepareVcolumn(TAOS *taos, int schemaCase, int tableNum, int lenOfBinaryDef, char* dbName) { + TAOS_RES *result; + int code; + char sqlstr[1024] = {0}; + sprintf(sqlstr, "drop database if exists %s;", dbName); + result = taos_query(taos, sqlstr); + taos_free_result(result); - (void)idx; + sprintf(sqlstr, "create database %s;", dbName); + result = taos_query(taos, sqlstr); + code = taos_errno(result); + if (code != 0) { + printf("failed to create database, reason:%s\n", taos_errstr(result)); + taos_free_result(result); + return; + } + taos_free_result(result); + + sprintf(sqlstr, "use %s;", dbName); + result = taos_query(taos, sqlstr); + taos_free_result(result); + + // create table + for (int i = 0 ; i < tableNum; i++) { + char buf[1024]; + if (schemaCase) { + sprintf(buf, "create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp)", i, lenOfBinaryDef, lenOfBinaryDef) ; + } else { + sprintf(buf, "create table m%d (ts timestamp, b int)", i) ; + } + + result = taos_query(taos, buf); + code = taos_errno(result); + if (code != 0) { + printf("failed to create table, reason:%s\n", taos_errstr(result)); + taos_free_result(result); + return; + } + taos_free_result(result); + } + +} + +//void runcase(TAOS *taos, int idx) { +static void runCase(TAOS *taos) { + TAOS_STMT *stmt = NULL; int tableNum; int lenOfBinaryDef; @@ -1975,7 +2012,7 @@ static void* runCase(void *para) { } #endif - return NULL; + return ; } @@ -2166,14 +2203,8 @@ static int stmt_bind_case_001_long(TAOS_STMT *stmt, int tableNum, int rowsOfPerC return 0; } -static void* runCase_long(void *para) { - ThreadInfo* tInfo = (ThreadInfo *)para; - TAOS *taos = tInfo->taos; - int idx = tInfo->idx; - +static void runCase_long(TAOS *taos) { TAOS_STMT *stmt = NULL; - - (void)idx; int tableNum; int lenOfBinaryDef; @@ -2228,7 +2259,7 @@ static void* runCase_long(void *para) { prepareV_long(taos, 1, tableNum, lenOfBinaryDef); totalRowsPerTbl = 0; - for (int i = 0; i < 30000; i++) { + for (int i = 0; i < g_runTimes; i++) { stmt = taos_stmt_init(taos); stmt_bind_case_001_long(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum, &startTs); @@ -2245,17 +2276,895 @@ static void* runCase_long(void *para) { } #endif - return NULL; + return; } +/*=======================*/ +/* +test scene: insert into tb1 (ts,f1) values (?,?) +*/ +static int stmt_specifyCol_bind_case_001(TAOS_STMT *stmt, int tableNum, int rowsOfPerColum, int bingNum, int lenOfBinaryDef, int lenOfBinaryAct, int columnNum) { + sampleValue* v = (sampleValue *)calloc(1, sizeof(sampleValue)); + + int totalRowsPerTbl = rowsOfPerColum * bingNum; + + v->ts = (int64_t *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * tableNum)); + v->br = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + v->nr = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + + int *lb = (int *)malloc(MAX_ROWS_OF_PER_COLUMN * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * (size_t)(bingNum * columnNum * (tableNum+1) * rowsOfPerColum)); + char* is_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + char* no_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + + int64_t tts = 1591060628000; + + for (int i = 0; i < rowsOfPerColum; ++i) { + lb[i] = lenOfBinaryAct; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v->b[i] = (int8_t)(i % 2); + v->v1[i] = (int8_t)((i+1) % 2); + v->v2[i] = (int16_t)i; + v->v4[i] = (int32_t)(i+1); + v->v8[i] = (int64_t)(i+2); + v->f4[i] = (float)(i+3); + v->f8[i] = (double)(i+4); + char tbuf[MAX_BINARY_DEF_LEN]; + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "binary-%d-0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789", i%10); + memcpy(v->br + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "nchar-%d-0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789", i%10); + memcpy(v->nr + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + v->ts2[i] = tts + i; + } + + int i = 0; + for (int j = 0; j < bingNum * tableNum; j++) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v->ts[j*rowsOfPerColum]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = rowsOfPerColum; + + params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[i+1].buffer_length = sizeof(int8_t); + params[i+1].buffer = v->b; + params[i+1].length = NULL; + params[i+1].is_null = is_null; + params[i+1].num = rowsOfPerColum; + + params[i+2].buffer_type = TSDB_DATA_TYPE_INT; + params[i+2].buffer_length = sizeof(int32_t); + params[i+2].buffer = v->v4; + params[i+2].length = NULL; + params[i+2].is_null = is_null; + params[i+2].num = rowsOfPerColum; + + params[i+3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[i+3].buffer_length = sizeof(float); + params[i+3].buffer = v->f4; + params[i+3].length = NULL; + params[i+3].is_null = is_null; + params[i+3].num = rowsOfPerColum; + + params[i+4].buffer_type = TSDB_DATA_TYPE_BINARY; + params[i+4].buffer_length = (uintptr_t)lenOfBinaryDef; + params[i+4].buffer = v->br; + params[i+4].length = lb; + params[i+4].is_null = is_null; + params[i+4].num = rowsOfPerColum; + + i+=columnNum; + } + + //int64_t tts = 1591060628000; + for (int i = 0; i < totalRowsPerTbl * tableNum; ++i) { + v->ts[i] = tts + i; + } + + unsigned long long starttime = getCurrentTime(); + +// create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp) + char *sql = "insert into m0 (ts,b,v4,f4,br) values(?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + int id = 0; + for (int l = 0; l < bingNum; l++) { + for (int zz = 0; zz < tableNum; zz++) { + //char buf[32]; + //sprintf(buf, "m%d", zz); + //code = taos_stmt_set_tbname(stmt, buf); + //if (code != 0){ + // printf("failed to execute taos_stmt_set_tbname. code:0x%x[%s]\n", code, tstrerror(code)); + // return -1; + //} + + for (int col=0; col < columnNum; ++col) { + code = taos_stmt_bind_single_param_batch(stmt, params + id, col); + if (code != 0){ + printf("failed to execute taos_stmt_bind_single_param_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + id++; + } + + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_add_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_execute. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + unsigned long long endtime = getCurrentTime(); + unsigned long long totalRows = (uint32_t)(totalRowsPerTbl * tableNum); + printf("insert total %d records, used %u seconds, avg:%u useconds per record\n", totalRows, (endtime-starttime)/1000000UL, (endtime-starttime)/totalRows); + + free(v->ts); + free(v->br); + free(v->nr); + free(v); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + +/*=======================*/ +/* +test scene: insert into ? (ts,f1) values (?,?) +*/ +static int stmt_specifyCol_bind_case_002(TAOS_STMT *stmt, int tableNum, int rowsOfPerColum, int bingNum, int lenOfBinaryDef, int lenOfBinaryAct, int columnNum) { + sampleValue* v = (sampleValue *)calloc(1, sizeof(sampleValue)); + + int totalRowsPerTbl = rowsOfPerColum * bingNum; + + v->ts = (int64_t *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * tableNum)); + v->br = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + v->nr = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + + int *lb = (int *)malloc(MAX_ROWS_OF_PER_COLUMN * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * (size_t)(bingNum * columnNum * (tableNum+1) * rowsOfPerColum)); + char* is_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + char* no_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + + int64_t tts = 1591060628000; + + for (int i = 0; i < rowsOfPerColum; ++i) { + lb[i] = lenOfBinaryAct; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v->b[i] = (int8_t)(i % 2); + v->v1[i] = (int8_t)((i+1) % 2); + v->v2[i] = (int16_t)i; + v->v4[i] = (int32_t)(i+1); + v->v8[i] = (int64_t)(i+2); + v->f4[i] = (float)(i+3); + v->f8[i] = (double)(i+4); + char tbuf[MAX_BINARY_DEF_LEN]; + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "binary-%d", i%10); + memcpy(v->br + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "nchar-%d", i%10); + memcpy(v->nr + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + v->ts2[i] = tts + i; + } + + int i = 0; + for (int j = 0; j < bingNum * tableNum; j++) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v->ts[j*rowsOfPerColum]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = rowsOfPerColum; + + params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[i+1].buffer_length = sizeof(int8_t); + params[i+1].buffer = v->b; + params[i+1].length = NULL; + params[i+1].is_null = is_null; + params[i+1].num = rowsOfPerColum; + + params[i+2].buffer_type = TSDB_DATA_TYPE_INT; + params[i+2].buffer_length = sizeof(int32_t); + params[i+2].buffer = v->v4; + params[i+2].length = NULL; + params[i+2].is_null = is_null; + params[i+2].num = rowsOfPerColum; + + params[i+3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[i+3].buffer_length = sizeof(float); + params[i+3].buffer = v->f4; + params[i+3].length = NULL; + params[i+3].is_null = is_null; + params[i+3].num = rowsOfPerColum; + + params[i+4].buffer_type = TSDB_DATA_TYPE_BINARY; + params[i+4].buffer_length = (uintptr_t)lenOfBinaryDef; + params[i+4].buffer = v->br; + params[i+4].length = lb; + params[i+4].is_null = is_null; + params[i+4].num = rowsOfPerColum; + + i+=columnNum; + } + + //int64_t tts = 1591060628000; + for (int i = 0; i < totalRowsPerTbl * tableNum; ++i) { + v->ts[i] = tts + i; + } + + unsigned long long starttime = getCurrentTime(); + +// create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp) + char *sql = "insert into ? (ts,b,v4,f4,br) values(?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + int id = 0; + for (int l = 0; l < bingNum; l++) { + for (int zz = 0; zz < tableNum; zz++) { + char buf[32]; + sprintf(buf, "m%d", zz); + code = taos_stmt_set_tbname(stmt, buf); + if (code != 0){ + printf("failed to execute taos_stmt_set_tbname. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + for (int col=0; col < columnNum; ++col) { + code = taos_stmt_bind_single_param_batch(stmt, params + id, col); + if (code != 0){ + printf("failed to execute taos_stmt_bind_single_param_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + id++; + } + + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_add_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_execute. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + unsigned long long endtime = getCurrentTime(); + unsigned long long totalRows = (uint32_t)(totalRowsPerTbl * tableNum); + printf("insert total %d records, used %u seconds, avg:%u useconds per record\n", totalRows, (endtime-starttime)/1000000UL, (endtime-starttime)/totalRows); + + free(v->ts); + free(v->br); + free(v->nr); + free(v); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + +/*=======================*/ +/* +test scene: insert into tb1 (ts,f1) values (?,?) +*/ +static int stmt_specifyCol_bind_case_001_maxRows(TAOS_STMT *stmt, int tableNum, int rowsOfPerColum, int bingNum, int lenOfBinaryDef, int lenOfBinaryAct, int columnNum) { + sampleValue* v = (sampleValue *)calloc(1, sizeof(sampleValue)); + + int totalRowsPerTbl = rowsOfPerColum * bingNum; + + v->ts = (int64_t *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * tableNum)); + v->br = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + v->nr = (char *)malloc(sizeof(int64_t) * (size_t)(totalRowsPerTbl * lenOfBinaryDef)); + + int *lb = (int *)malloc(MAX_ROWS_OF_PER_COLUMN * sizeof(int)); + + TAOS_MULTI_BIND *params = calloc(1, sizeof(TAOS_MULTI_BIND) * (size_t)(bingNum * columnNum * (tableNum+1) * rowsOfPerColum)); + char* is_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + char* no_null = malloc(sizeof(char) * MAX_ROWS_OF_PER_COLUMN); + + int64_t tts = 1591060628000; + + for (int i = 0; i < rowsOfPerColum; ++i) { + lb[i] = lenOfBinaryAct; + no_null[i] = 0; + is_null[i] = (i % 10 == 2) ? 1 : 0; + v->b[i] = (int8_t)(i % 2); + v->v1[i] = (int8_t)((i+1) % 2); + v->v2[i] = (int16_t)i; + v->v4[i] = (int32_t)(i+1); + v->v8[i] = (int64_t)(i+2); + v->f4[i] = (float)(i+3); + v->f8[i] = (double)(i+4); + char tbuf[MAX_BINARY_DEF_LEN]; + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "binary-%d-0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789", i%10); + memcpy(v->br + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + memset(tbuf, 0, MAX_BINARY_DEF_LEN); + sprintf(tbuf, "nchar-%d-0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789", i%10); + memcpy(v->nr + i*lenOfBinaryDef, tbuf, (size_t)lenOfBinaryAct); + v->ts2[i] = tts + i; + } + + int i = 0; + for (int j = 0; j < bingNum * tableNum; j++) { + params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[i+0].buffer_length = sizeof(int64_t); + params[i+0].buffer = &v->ts[j*rowsOfPerColum]; + params[i+0].length = NULL; + params[i+0].is_null = no_null; + params[i+0].num = rowsOfPerColum; + + params[i+1].buffer_type = TSDB_DATA_TYPE_INT; + params[i+1].buffer_length = sizeof(int32_t); + params[i+1].buffer = v->v4; + params[i+1].length = NULL; + params[i+1].is_null = is_null; + params[i+1].num = rowsOfPerColum; + + i+=columnNum; + } + + //int64_t tts = 1591060628000; + for (int i = 0; i < totalRowsPerTbl * tableNum; ++i) { + v->ts[i] = tts + i; + } + + unsigned long long starttime = getCurrentTime(); + +// create table m%d (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, br binary(%d), nr nchar(%d), ts2 timestamp) + char *sql = "insert into m0 (ts,b,v4,f4,br) values(?,?,?,?,?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + + int id = 0; + for (int l = 0; l < bingNum; l++) { + for (int zz = 0; zz < tableNum; zz++) { + //char buf[32]; + //sprintf(buf, "m%d", zz); + //code = taos_stmt_set_tbname(stmt, buf); + //if (code != 0){ + // printf("failed to execute taos_stmt_set_tbname. code:0x%x[%s]\n", code, tstrerror(code)); + // return -1; + //} + + for (int col=0; col < columnNum; ++col) { + code = taos_stmt_bind_single_param_batch(stmt, params + id, col); + if (code != 0){ + printf("failed to execute taos_stmt_bind_single_param_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + id++; + } + + code = taos_stmt_add_batch(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_add_batch. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + code = taos_stmt_execute(stmt); + if (code != 0) { + printf("failed to execute taos_stmt_execute. code:0x%x[%s]\n", code, tstrerror(code)); + return -1; + } + } + + unsigned long long endtime = getCurrentTime(); + unsigned long long totalRows = (uint32_t)(totalRowsPerTbl * tableNum); + printf("insert total %d records, used %u seconds, avg:%u useconds per record\n", totalRows, (endtime-starttime)/1000000UL, (endtime-starttime)/totalRows); + + free(v->ts); + free(v->br); + free(v->nr); + free(v); + free(lb); + free(params); + free(is_null); + free(no_null); + + return 0; +} + +static void SpecifyColumnBatchCase(TAOS *taos) { + TAOS_STMT *stmt = NULL; + + int tableNum; + int lenOfBinaryDef; + int rowsOfPerColum; + int bingNum; + int lenOfBinaryAct; + int columnNum; + + int totalRowsPerTbl; + +//=======================================================================// +//=============================== single table ==========================// +//========== case 1: ======================// +#if 1 +{ + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 1; + bingNum = 1; + lenOfBinaryDef = 40; + lenOfBinaryAct = 8; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db1"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 1 check result end\n\n"); +} +#endif + + //========== case 2: ======================// +#if 1 +{ + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 5; + bingNum = 1; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 15; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db2"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + //checkResult(taos, "m1", 0, totalRowsPerTbl); + //checkResult(taos, "m2", 0, totalRowsPerTbl); + //checkResult(taos, "m3", 0, totalRowsPerTbl); + //checkResult(taos, "m4", 0, totalRowsPerTbl); + //checkResult(taos, "m5", 0, totalRowsPerTbl); + //checkResult(taos, "m6", 0, totalRowsPerTbl); + //checkResult(taos, "m7", 0, totalRowsPerTbl); + //checkResult(taos, "m8", 0, totalRowsPerTbl); + //checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 2 check result end\n\n"); +} +#endif + + //========== case 2-1: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 32767; + bingNum = 1; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 15; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db2_1"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + //checkResult(taos, "m1", 0, totalRowsPerTbl); + //checkResult(taos, "m2", 0, totalRowsPerTbl); + //checkResult(taos, "m3", 0, totalRowsPerTbl); + //checkResult(taos, "m4", 0, totalRowsPerTbl); + //checkResult(taos, "m5", 0, totalRowsPerTbl); + //checkResult(taos, "m6", 0, totalRowsPerTbl); + //checkResult(taos, "m7", 0, totalRowsPerTbl); + //checkResult(taos, "m8", 0, totalRowsPerTbl); + //checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 2-1 check result end\n\n"); + } +#endif + //========== case 2-2: ======================// +#if 1 + { + printf("====case 2-2 error test start\n"); + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 32768; + bingNum = 1; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 15; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db2_2"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + //checkResult(taos, "m1", 0, totalRowsPerTbl); + //checkResult(taos, "m2", 0, totalRowsPerTbl); + //checkResult(taos, "m3", 0, totalRowsPerTbl); + //checkResult(taos, "m4", 0, totalRowsPerTbl); + //checkResult(taos, "m5", 0, totalRowsPerTbl); + //checkResult(taos, "m6", 0, totalRowsPerTbl); + //checkResult(taos, "m7", 0, totalRowsPerTbl); + //checkResult(taos, "m8", 0, totalRowsPerTbl); + //checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("====case 2-2 check result end\n\n"); + } +#endif + + + //========== case 3: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 1; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 20; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db3"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + //checkResult(taos, "m1", 0, totalRowsPerTbl); + //checkResult(taos, "m2", 0, totalRowsPerTbl); + //checkResult(taos, "m3", 0, totalRowsPerTbl); + //checkResult(taos, "m4", 0, totalRowsPerTbl); + //checkResult(taos, "m5", 0, totalRowsPerTbl); + //checkResult(taos, "m6", 0, totalRowsPerTbl); + //checkResult(taos, "m7", 0, totalRowsPerTbl); + //checkResult(taos, "m8", 0, totalRowsPerTbl); + //checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 3 check result end\n\n"); + } +#endif + + //========== case 4: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 5; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 33; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db4"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + //checkResult(taos, "m1", 0, totalRowsPerTbl); + //checkResult(taos, "m2", 0, totalRowsPerTbl); + //checkResult(taos, "m3", 0, totalRowsPerTbl); + //checkResult(taos, "m4", 0, totalRowsPerTbl); + //checkResult(taos, "m5", 0, totalRowsPerTbl); + //checkResult(taos, "m6", 0, totalRowsPerTbl); + //checkResult(taos, "m7", 0, totalRowsPerTbl); + //checkResult(taos, "m8", 0, totalRowsPerTbl); + //checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 4 check result end\n\n"); + } +#endif + +//=======================================================================// +//=============================== multi tables ==========================// + //========== case 5: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 1; + bingNum = 1; + lenOfBinaryDef = 40; + lenOfBinaryAct = 16; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db5"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 5 check result end\n\n"); + } +#endif + + //========== case 6: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 5; + bingNum = 1; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 20; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db6"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 6 check result end\n\n"); + } +#endif + + //========== case 7: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 1; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 33; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db7"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 7 check result end\n\n"); + } +#endif + + //========== case 8: ======================// +#if 1 +{ + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 5; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 40; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db8"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 8 check result end\n\n"); +} +#endif + + //=======================================================================// + //=============================== multi-rows to single table ==========================// + //========== case 9: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 23740; + bingNum = 1; + lenOfBinaryDef = 40; + lenOfBinaryAct = 8; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db9"); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 9 check result end\n\n"); + } +#endif + + //========== case 10: ======================// +#if 0 + { + printf("====case 10 error test start\n"); + stmt = taos_stmt_init(taos); + + tableNum = 1; + rowsOfPerColum = 23741; // WAL size exceeds limit + bingNum = 1; + lenOfBinaryDef = 40; + lenOfBinaryAct = 8; + columnNum = 5; + + prepareV(taos, 1, tableNum, lenOfBinaryDef); + stmt_specifyCol_bind_case_001(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("====case 10 check result end\n\n"); + } +#endif + + + //=======================================================================// + //=============================== multi tables, multi bind one same table ==========================// + //========== case 13: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 1; + bingNum = 5; + lenOfBinaryDef = 40; + lenOfBinaryAct = 28; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db13"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 13 check result end\n\n"); + } +#endif + + //========== case 14: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 5; + rowsOfPerColum = 5; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 33; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db14"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m1", 0, totalRowsPerTbl); + checkResult(taos, "m2", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m4", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 14 check result end\n\n"); + } +#endif + + + //========== case 15: ======================// +#if 1 + { + stmt = taos_stmt_init(taos); + + tableNum = 1000; + rowsOfPerColum = 10; + bingNum = 5; + lenOfBinaryDef = 1000; + lenOfBinaryAct = 8; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db15"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m111", 0, totalRowsPerTbl); + checkResult(taos, "m222", 0, totalRowsPerTbl); + checkResult(taos, "m333", 0, totalRowsPerTbl); + checkResult(taos, "m500", 0, totalRowsPerTbl); + checkResult(taos, "m999", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 15 check result end\n\n"); + } +#endif + + //========== case 17: ======================// +#if 1 + { + //printf("case 17 test start\n"); + stmt = taos_stmt_init(taos); + + tableNum = 10; + rowsOfPerColum = 100; + bingNum = 1; + lenOfBinaryDef = 100; + lenOfBinaryAct = 8; + columnNum = 5; + + prepareVcolumn(taos, 1, tableNum, lenOfBinaryDef, "db17"); + stmt_specifyCol_bind_case_002(stmt, tableNum, rowsOfPerColum, bingNum, lenOfBinaryDef, lenOfBinaryAct, columnNum); + + totalRowsPerTbl = rowsOfPerColum * bingNum; + checkResult(taos, "m0", 0, totalRowsPerTbl); + checkResult(taos, "m3", 0, totalRowsPerTbl); + checkResult(taos, "m5", 0, totalRowsPerTbl); + checkResult(taos, "m8", 0, totalRowsPerTbl); + checkResult(taos, "m9", 0, totalRowsPerTbl); + taos_stmt_close(stmt); + printf("case 17 check result end\n\n"); + } +#endif + + return ; + +} int main(int argc, char *argv[]) { TAOS *taos; char host[32] = "127.0.0.1"; char* serverIp = NULL; - int threadNum = 2; + int threadNum = 1; // connect to server if (argc == 1) { @@ -2268,9 +3177,12 @@ int main(int argc, char *argv[]) } else if (argc == 4) { serverIp = argv[1]; threadNum = atoi(argv[2]); - g_rows = atoi(argv[3]); + g_runTimes = atoi(argv[3]); } + printf("server:%s, runTimes:%d\n\n", serverIp, g_runTimes); + +#if 0 printf("server:%s, threadNum:%d, rows:%d\n\n", serverIp, threadNum, g_rows); pthread_t *pThreadList = (pthread_t *) calloc(sizeof(pthread_t), (size_t)threadNum); @@ -2287,7 +3199,8 @@ int main(int argc, char *argv[]) tInfo->taos = taos; tInfo->idx = i; if (0 == i) { - pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo); + //pthread_create(&(pThreadList[0]), NULL, runCase, (void *)tInfo); + pthread_create(&(pThreadList[0]), NULL, SpecifyColumnBatchCase, (void *)tInfo); } else if (1 == i){ pthread_create(&(pThreadList[0]), NULL, runCase_long, (void *)tInfo); } @@ -2300,6 +3213,18 @@ int main(int argc, char *argv[]) free(pThreadList); free(threadInfo); +#endif + + taos = taos_connect(serverIp, "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to TDengine, reason:%s\n", taos_errstr(taos)); + return -1; + } + + runCase(taos); + runCase_long(taos); + SpecifyColumnBatchCase(taos); + return 0; } From c74651a6cb906f71368357fe7acd61a260bae8d0 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 21 May 2021 21:35:41 +0800 Subject: [PATCH 2/5] Hotfix/sangshuduo/td 4136 taosdemo records morethan 32767 for develop (#6189) * [TD-4136]: taosdemo records per req more than 32767. for develop branch. * change taosRandom() to rand_bool() for bool data. * [TD-4136]: taosdemo check insert rows not more than 32767. check insert rows for progressive. * fix with answer_yes. * fix interlace rows checking position. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e8075c69ba..6c1f56e44a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3492,16 +3492,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } g_args.interlace_rows = interlaceRows->valueint; - - // rows per table need be less than insert batch - if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", - g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", - g_args.num_of_RPR); - prompt(); - g_args.interlace_rows = g_args.num_of_RPR; - } } else if (!interlaceRows) { g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { @@ -3567,6 +3557,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + // rows per table need be less than insert batch + if (g_args.interlace_rows > g_args.num_of_RPR) { + printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", + g_args.interlace_rows, g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + g_args.num_of_RPR); + prompt(); + g_args.interlace_rows = g_args.num_of_RPR; + } + cJSON* dbs = cJSON_GetObjectItem(root, "databases"); if (!dbs || dbs->type != cJSON_Array) { printf("ERROR: failed to read json, databases not found\n"); From fc394200e4a7bb01f36810da14cb93bf12ba2d01 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 21 May 2021 23:20:42 +0800 Subject: [PATCH 3/5] Hotfix/sangshuduo/td 4240 taosdemo subscribe toomuch for develop (#6193) * [TD-4240]: taosdemo subscribe more than max query sql count. * [TD-4240]: taosdemo subscribe more than 100. fix tsub sequence bug. * [TD-4240]: taosdemo subscribe more than 100. fix auto create table. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 102 +++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6c1f56e44a..feab1aab90 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -4890,10 +4890,12 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k) return affectedRows; } -static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) +static void getTableName(char *pTblName, + threadInfo* pThreadInfo, uint64_t tableSeq) { SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - if (superTblInfo) { + if ((superTblInfo) + && (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) { if (superTblInfo->childTblLimit > 0) { snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", superTblInfo->childTblName + @@ -6768,6 +6770,7 @@ static void *superSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; char subSqlstr[MAX_QUERY_SQL_LENGTH]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; + uint64_t tsubSeq; if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", @@ -6776,6 +6779,15 @@ static void *superSubscribe(void *sarg) { exit(-1); } + if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n", + g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables, + g_queryInfo.superQueryInfo.sqlCount, + pThreadInfo->ntables, + MAX_QUERY_SQL_COUNT); + exit(-1); + } + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -6804,6 +6816,8 @@ static void *superSubscribe(void *sarg) { char topic[32] = {0}; for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + + tsubSeq = i - pThreadInfo->start_table_from; verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", __func__, __LINE__, pThreadInfo->threadID, @@ -6823,12 +6837,12 @@ static void *superSubscribe(void *sarg) { debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr); - tsub[i] = subscribeImpl( + tsub[tsubSeq] = subscribeImpl( STABLE_CLASS, pThreadInfo, subSqlstr, topic, g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeInterval); - if (NULL == tsub[i]) { + if (NULL == tsub[tsubSeq]) { taos_close(pThreadInfo->taos); return NULL; } @@ -6844,54 +6858,56 @@ static void *superSubscribe(void *sarg) { while(1) { for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { - continue; - } + tsubSeq = i - pThreadInfo->start_table_from; + if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + continue; + } - res = taos_consume(tsub[i]); - if (res) { - if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", - g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - appendResultToFile(res, pThreadInfo->fp); - } - if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(pThreadInfo->fp, "%s-%d", - g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - appendResultToFile(res, pThreadInfo->fp); - } - consumed[i] ++; + res = taos_consume(tsub[tsubSeq]); + if (res) { + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + appendResultToFile(res, pThreadInfo->fp); + } + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + appendResultToFile(res, pThreadInfo->fp); + } + consumed[tsubSeq] ++; - if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) - && (consumed[i] >= - g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { - printf("keepProgress:%d, resub super table query: %"PRIu64"\n", - g_queryInfo.superQueryInfo.subscribeKeepProgress, - pThreadInfo->querySeq); - taos_unsubscribe(tsub, + if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) + && (consumed[tsubSeq] >= + g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { + printf("keepProgress:%d, resub super table query: %"PRIu64"\n", + g_queryInfo.superQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + taos_unsubscribe(tsub[tsubSeq], g_queryInfo.superQueryInfo.subscribeKeepProgress); - consumed[i]= 0; - tsub[i] = subscribeImpl( - STABLE_CLASS, - pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - g_queryInfo.superQueryInfo.subscribeInterval - ); - if (NULL == tsub[i]) { - taos_close(pThreadInfo->taos); - return NULL; - } - } - } + consumed[tsubSeq]= 0; + tsub[tsubSeq] = subscribeImpl( + STABLE_CLASS, + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval + ); + if (NULL == tsub[tsubSeq]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } + } } } taos_free_result(res); for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - taos_unsubscribe(tsub[i], 0); + tsubSeq = i - pThreadInfo->start_table_from; + taos_unsubscribe(tsub[tsubSeq], 0); } taos_close(pThreadInfo->taos); From fbb160572efa96495a91a1a60abe5a11425b4f19 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 22 May 2021 10:09:29 +0800 Subject: [PATCH 4/5] [TD-4034]debug log --- src/tsdb/src/tsdbMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 1e6f9eac12..880400dd9f 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -735,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea pTable->maxColumnNum += 1; - tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); + tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } From 1062357eeee0cb3eb04064c20c98a3f356f51ad0 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 26 May 2021 17:36:51 +0800 Subject: [PATCH 5/5] [TD-4034]when cacheLast option changed,restore or free cache last data --- src/tsdb/inc/tsdbMeta.h | 6 +- src/tsdb/inc/tsdbint.h | 4 + src/tsdb/src/tsdbCommitQueue.c | 22 +++-- src/tsdb/src/tsdbMain.c | 172 ++++++++++++++++++++++++++++++++- src/tsdb/src/tsdbMeta.c | 23 ++--- 5 files changed, 203 insertions(+), 24 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 45868c002d..14d5a41768 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -38,8 +38,9 @@ typedef struct STable { SRWLatch latch; // TODO: implementa latch functions SDataCol *lastCols; - int16_t lastColNum; - int16_t maxColumnNum; + int16_t maxColNum; + int16_t restoreColumnNum; + bool hasRestoreLastColumn; int lastColSVersion; T_REF_DECLARE() } STable; @@ -90,6 +91,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); STSchema* tsdbGetTableLatestSchema(STable *pTable); +void tsdbFreeLastColumns(STable* pTable); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 4d62164df9..e74c3238e2 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -75,6 +75,9 @@ struct STsdbRepo { STsdbCfg save_config; // save apply config bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config + + uint8_t hasCachedLastRow; + uint8_t hasCachedLastColumn; STsdbAppH appH; STsdbStat stat; @@ -100,6 +103,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index e753a3211e..abea79bc4f 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -113,11 +113,15 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { } static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { + pthread_mutex_lock(&pRepo->save_mutex); + pRepo->config_changed = false; STsdbCfg * pSaveCfg = &pRepo->save_config; - + STsdbCfg oldCfg; int32_t oldTotalBlocks = pRepo->config.totalBlocks; + memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg)); + pRepo->config.compression = pRepo->save_config.compression; pRepo->config.keep = pRepo->save_config.keep; pRepo->config.keep1 = pRepo->save_config.keep1; @@ -125,10 +129,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)", + pthread_mutex_unlock(&pRepo->save_mutex); + + tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)", REPO_ID(pRepo), pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks); + pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); int err = tsdbExpendPool(pRepo, oldTotalBlocks); if (!TAOS_SUCCEEDED(err)) { @@ -136,6 +142,12 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); } + if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { + if (tsdbLockRepo(pRepo) < 0) return; + tsdbCacheLastData(pRepo, &oldCfg); + tsdbUnlockRepo(pRepo); + } + } static void *tsdbLoopCommit(void *arg) { @@ -165,10 +177,8 @@ static void *tsdbLoopCommit(void *arg) { pRepo = ((SCommitReq *)pNode->data)->pRepo; // check if need to apply new config - if (pRepo->config_changed) { - pthread_mutex_lock(&pRepo->save_mutex); + if (pRepo->config_changed) { tsdbApplyRepoConfig(pRepo); - pthread_mutex_unlock(&pRepo->save_mutex); } tsdbCommitData(pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 880400dd9f..a8bbd0d69e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -548,6 +548,8 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; + atomic_store_8(&pRepo->hasCachedLastRow, 0); + atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { @@ -636,7 +638,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea int err = 0; numColumns = schemaNCols(pSchema); - if (numColumns <= pTable->maxColumnNum) { + if (numColumns <= pTable->restoreColumnNum) { return 0; } if (pTable->lastColSVersion != schemaVersion(pSchema)) { @@ -675,7 +677,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea SBlockIdx *pIdx = pReadh->pBlkIdx; blockIdx = (int32_t)(pIdx->numOfBlocks - 1); - while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { + while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { bool loadStatisData = false; pBlock = pReadh->pBlkInfo->blocks + blockIdx; blockIdx -= 1; @@ -693,7 +695,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea loadStatisData = true; } - for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { + for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { STColumn *pCol = schemaColAt(pSchema, i); // ignore loaded columns if (pTable->lastCols[i].bytes != 0) { @@ -733,7 +735,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); pLastCol->ts = dataRowKey(row); - pTable->maxColumnNum += 1; + pTable->restoreColumnNum += 1; tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; @@ -766,7 +768,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->maxColumnNum = 0; + pTable->restoreColumnNum = 0; } } @@ -841,5 +843,165 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } tsdbDestroyReadH(&readh); + if (CACHE_LAST_ROW(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + return 0; } + +int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { + bool cacheLastRow = false, cacheLastCol = false; + SFSIter fsiter; + SReadH readh; + SDFileSet *pSet; + STsdbMeta *pMeta = pRepo->tsdbMeta; + //STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlock * pBlock; + int tableNum = 0; + int maxTableIdx = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; + + bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config)); + bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + + if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { + tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed"); + cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config)); + cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config)); + } + + // calc max table idx and table num + for (int i = 1; i < pMeta->maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + tableNum += 1; + maxTableIdx = i; + if (cacheLastCol) { + pTable->restoreColumnNum = 0; + } + } + + // if close last option,need to free data + if (need_free_last_row || need_free_last_col) { + if (need_free_last_row) { + atomic_store_8(&pRepo->hasCachedLastRow, 0); + } + if (need_free_last_col) { + atomic_store_8(&pRepo->hasCachedLastColumn, 0); + } + tsdbInfo("free cache last data since cacheLast option changed"); + for (int i = 1; i < maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + if (need_free_last_row) { + taosTZfree(pTable->lastRow); + pTable->lastRow = NULL; + pTable->lastKey = TSKEY_INITIAL_VAL; + } + if (need_free_last_col) { + tsdbFreeLastColumns(pTable); + } + } + } + + if (!cacheLastRow && !cacheLastCol) { + return 0; + } + + cacheLastRowTableNum = cacheLastRow ? tableNum : 0; + cacheLastColTableNum = cacheLastCol ? tableNum : 0; + + if (tsdbInitReadH(&readh, pRepo) < 0) { + return -1; + } + + tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD); + + while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) { + if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + if (tsdbLoadBlockIdx(&readh) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + for (int i = 1; i <= maxTableIdx; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable == NULL) continue; + + //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + + if (tsdbSetReadTable(&readh, pTable) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + SBlockIdx *pIdx = readh.pBlkIdx; + + if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { + pTable->lastKey = pIdx->maxKey; + + if (tsdbLoadBlockInfo(&readh, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1; + + if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) { + tsdbDestroyReadH(&readh); + return -1; + } + + // Get the data in row + ASSERT(pTable->lastRow == NULL); + STSchema *pSchema = tsdbGetTableSchema(pTable); + pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + if (pTable->lastRow == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(&readh); + return -1; + } + + tdInitDataRow(pTable->lastRow, pSchema); + for (int icol = 0; icol < schemaNCols(pSchema); icol++) { + STColumn *pCol = schemaColAt(pSchema, icol); + SDataCol *pDataCol = readh.pDCols[0]->cols + icol; + tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, + pCol->offset); + } + cacheLastRowTableNum -= 1; + } + + // restore NULL columns + if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) { + if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { + tsdbDestroyReadH(&readh); + return -1; + } + if (pTable->hasRestoreLastColumn) { + cacheLastColTableNum -= 1; + } + } + } + } + + tsdbDestroyReadH(&readh); + + if (cacheLastRow) { + atomic_store_8(&pRepo->hasCachedLastRow, 1); + } + if (cacheLastCol) { + atomic_store_8(&pRepo->hasCachedLastColumn, 1); + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5717d52eea..324a7c79c5 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -44,7 +44,6 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); -static void tsdbFreeLastColumns(STable* pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { @@ -590,12 +589,12 @@ void tsdbUnRefTable(STable *pTable) { } } -static void tsdbFreeLastColumns(STable* pTable) { +void tsdbFreeLastColumns(STable* pTable) { if (pTable->lastCols == NULL) { return; } - for (int i = 0; i < pTable->lastColNum; ++i) { + for (int i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].bytes == 0) { continue; } @@ -605,14 +604,16 @@ static void tsdbFreeLastColumns(STable* pTable) { } tfree(pTable->lastCols); pTable->lastCols = NULL; - pTable->lastColNum = 0; + pTable->maxColNum = 0; + pTable->lastColSVersion = -1; + pTable->restoreColumnNum = 0; } int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { if (pTable->lastCols == NULL) { return -1; } - for (int16_t i = 0; i < pTable->lastColNum; ++i) { + for (int16_t i = 0; i < pTable->maxColNum; ++i) { if (pTable->lastCols[i].colId == colId) { return i; } @@ -640,8 +641,8 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { } pTable->lastColSVersion = schemaVersion(pSchema); - pTable->lastColNum = numOfColumn; - pTable->maxColumnNum = 0; + pTable->maxColNum = numOfColumn; + pTable->restoreColumnNum = 0; return 0; } @@ -682,11 +683,11 @@ int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { } SDataCol *oldLastCols = pTable->lastCols; - int16_t oldLastColNum = pTable->lastColNum; + int16_t oldLastColNum = pTable->maxColNum; pTable->lastColSVersion = schemaVersion(pNewSchema); pTable->lastCols = lastCols; - pTable->lastColNum = numOfCols; + pTable->maxColNum = numOfCols; if (oldLastCols == NULL) { TSDB_WUNLOCK_TABLE(pTable); @@ -797,8 +798,8 @@ static STable *tsdbNewTable() { pTable->lastKey = TSKEY_INITIAL_VAL; pTable->lastCols = NULL; - pTable->maxColumnNum = 0; - pTable->lastColNum = 0; + pTable->restoreColumnNum = 0; + pTable->maxColNum = 0; pTable->lastColSVersion = -1; return pTable; }