[TD-5445]<fix>: taosdemo bug stmt interface with sample data. (#6971)

sync from develop branch.
This commit is contained in:
Shuduo Sang 2021-07-22 16:52:00 +08:00 committed by GitHub
parent c41d23daaa
commit 199bf6e0d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 1461 additions and 1453 deletions

View File

@ -16,7 +16,7 @@
/* /*
when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread. when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread.
*/ */
#include <stdint.h> #include <stdint.h>
#include <taos.h> #include <taos.h>
@ -24,24 +24,24 @@
#define CURL_STATICLIB #define CURL_STATICLIB
#ifdef LINUX #ifdef LINUX
#include <argp.h> #include <argp.h>
#include <inttypes.h> #include <inttypes.h>
#ifndef _ALPINE #ifndef _ALPINE
#include <error.h> #include <error.h>
#endif #endif
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <wordexp.h> #include <wordexp.h>
#include <regex.h> #include <regex.h>
#else #else
#include <regex.h> #include <regex.h>
#include <stdio.h> #include <stdio.h>
#endif #endif
#include <assert.h> #include <assert.h>
@ -485,7 +485,7 @@ typedef unsigned __int32 uint32_t;
#pragma comment ( lib, "ws2_32.lib" ) #pragma comment ( lib, "ws2_32.lib" )
// Some old MinGW/CYGWIN distributions don't define this: // Some old MinGW/CYGWIN distributions don't define this:
#ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
#define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004 #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
#endif // ENABLE_VIRTUAL_TERMINAL_PROCESSING #endif // ENABLE_VIRTUAL_TERMINAL_PROCESSING
static HANDLE g_stdoutHandle; static HANDLE g_stdoutHandle;
@ -2697,7 +2697,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
calcRowLen(superTbls); calcRowLen(superTbls);
/* /*
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName; //get all child table name use cmd: select tbname from superTblName;
int childTblCount = 10000; int childTblCount = 10000;
@ -3270,7 +3270,7 @@ static void createChildTables() {
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
size_t n = 0; size_t n = 0;
ssize_t readLen = 0; ssize_t readLen = 0;
@ -3338,7 +3338,7 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
/* /*
Read 10000 lines at most. If more than 10000 lines, continue to read after using Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/ */
static int readSampleFromCsvFileToMem( static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) { SSuperTable* superTblInfo) {
size_t n = 0; size_t n = 0;
@ -5118,7 +5118,8 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
case STMT_IFACE: case STMT_IFACE:
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt); debugPrint("%s() LN%d, stmt=%p",
__func__, __LINE__, pThreadInfo->stmt);
if (0 != taos_stmt_execute(pThreadInfo->stmt)) { if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
errorPrint("%s() LN%d, failied to execute insert statement\n", errorPrint("%s() LN%d, failied to execute insert statement\n",
__func__, __LINE__); __func__, __LINE__);
@ -5771,6 +5772,8 @@ static int32_t prepareStbStmtBind(
TAOS_BIND *bind; TAOS_BIND *bind;
if (isColumn) { if (isColumn) {
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) { for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
@ -5794,7 +5797,6 @@ static int32_t prepareStbStmtBind(
ptr += bind->buffer_length; ptr += bind->buffer_length;
} else { } else {
int cursor = 0;
if (sourceRand) { if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType( if ( -1 == prepareStmtBindArrayByType(
@ -5851,6 +5853,7 @@ static int32_t prepareStbStmtBind(
} }
free(bindBuffer);
return 0; return 0;
} }
@ -6570,7 +6573,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values", pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix, pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from); pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { // if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) { if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++; pThreadInfo->start_table_from++;
pThreadInfo->counter = 0; pThreadInfo->counter = 0;
@ -6724,14 +6727,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t limit; int64_t limit;
uint64_t offset; uint64_t offset;
if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) && if ((NULL != g_args.sqlFile)
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) { && (superTblInfo->childTblExists == TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset != 0)
|| (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
} }
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) { if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0) if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit) || ((superTblInfo->childTblOffset
+ superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) { > (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit = superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset; superTblInfo->childTblCount - superTblInfo->childTblOffset;
@ -6837,7 +6843,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE) if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { || ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
int columnCount; int columnCount;
if (superTblInfo) { if (superTblInfo) {
@ -6865,7 +6872,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
== superTblInfo->autoCreateTable)) { == superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName); superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) { for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?"); pstr += sprintf(pstr, ",?");
} }
pstr += sprintf(pstr, ") VALUES(?"); pstr += sprintf(pstr, ") VALUES(?");
@ -7027,12 +7035,12 @@ static void *readTable(void *sarg) {
} }
int64_t num_of_DPT; int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) { /* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table; num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
} else { } else {
*/ */
num_of_DPT = g_args.num_of_DPT; num_of_DPT = g_args.num_of_DPT;
// } // }
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables; int64_t totalData = num_of_DPT * num_of_tables;
@ -7591,7 +7599,7 @@ static int queryTestProcess() {
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection; // taos_close(taos);// TODO: workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs(); uint64_t endTs = taosGetTimestampMs();
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
@ -7817,7 +7825,7 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
// TAOS_SUB* tsub = NULL; // TAOS_SUB* tsub = NULL;
setThreadName("specSub"); setThreadName("specSub");
@ -8076,7 +8084,7 @@ static int subscribeTestProcess() {
tmfree((char*)pidsOfStable); tmfree((char*)pidsOfStable);
tmfree((char*)infosOfStable); tmfree((char*)infosOfStable);
// taos_close(taos); // taos_close(taos);
return 0; return 0;
} }