Hotfix/sangshuduo/td 3197 fix taosdemo coverity scan (#5521)
* [TD-3197] <fix>: fix taosdemo coverity scan issues. * [TD-3197] <fix>: fix taosdemo coverity scan issue. fix subscribeTest pids uninitialized. * [TD-3197] <fix>: fix taosdemo coverity scan issues. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
3b4a2ecce9
commit
de97080d85
|
@ -4212,7 +4212,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%d",
|
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%d",
|
||||||
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
|
g_args.tb_prefix, tableSeq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4355,7 +4355,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
|
||||||
tableName);
|
tableName);
|
||||||
} else {
|
} else {
|
||||||
len = snprintf(buffer,
|
len = snprintf(buffer,
|
||||||
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
|
superTblInfo->maxSqlLen,
|
||||||
"insert into %s.%s values",
|
"insert into %s.%s values",
|
||||||
pThreadInfo->db_name,
|
pThreadInfo->db_name,
|
||||||
tableName);
|
tableName);
|
||||||
|
@ -5471,17 +5471,21 @@ static int queryTestProcess() {
|
||||||
&& g_queryInfo.superQueryInfo.concurrent > 0) {
|
&& g_queryInfo.superQueryInfo.concurrent > 0) {
|
||||||
|
|
||||||
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
|
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
|
||||||
|
if (NULL == pids) {
|
||||||
|
taos_close(taos);
|
||||||
|
ERROR_EXIT("memory allocation failed\n");
|
||||||
|
}
|
||||||
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
|
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
|
||||||
if ((NULL == pids) || (NULL == infos)) {
|
if (NULL == infos) {
|
||||||
printf("malloc failed for create threads\n");
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
exit(-1);
|
free(pids);
|
||||||
|
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
t_info->threadID = i;
|
t_info->threadID = i;
|
||||||
|
|
||||||
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
||||||
t_info->taos = taos;
|
t_info->taos = taos;
|
||||||
|
|
||||||
|
@ -5489,6 +5493,8 @@ static int queryTestProcess() {
|
||||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||||
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||||
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE)) {
|
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||||
|
free(infos);
|
||||||
|
free(pids);
|
||||||
errorPrint( "use database %s failed!\n\n",
|
errorPrint( "use database %s failed!\n\n",
|
||||||
g_queryInfo.dbName);
|
g_queryInfo.dbName);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -5496,7 +5502,7 @@ static int queryTestProcess() {
|
||||||
} else {
|
} else {
|
||||||
t_info->taos = NULL;
|
t_info->taos = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_create(pids + i, NULL, superQueryProcess, t_info);
|
pthread_create(pids + i, NULL, superQueryProcess, t_info);
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
|
@ -5509,11 +5515,21 @@ static int queryTestProcess() {
|
||||||
if ((g_queryInfo.subQueryInfo.sqlCount > 0)
|
if ((g_queryInfo.subQueryInfo.sqlCount > 0)
|
||||||
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) {
|
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) {
|
||||||
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t));
|
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t));
|
||||||
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo));
|
if (NULL == pidsOfSub) {
|
||||||
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
|
|
||||||
printf("malloc failed for create threads\n");
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
exit(-1);
|
free(infos);
|
||||||
|
free(pids);
|
||||||
|
|
||||||
|
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo));
|
||||||
|
if (NULL == infosOfSub) {
|
||||||
|
taos_close(taos);
|
||||||
|
free(pidsOfSub);
|
||||||
|
free(infos);
|
||||||
|
free(pids);
|
||||||
|
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
int ntables = g_queryInfo.subQueryInfo.childTblCount;
|
int ntables = g_queryInfo.subQueryInfo.childTblCount;
|
||||||
|
@ -5544,62 +5560,63 @@ static int queryTestProcess() {
|
||||||
}
|
}
|
||||||
|
|
||||||
g_queryInfo.subQueryInfo.threadCnt = threads;
|
g_queryInfo.subQueryInfo.threadCnt = threads;
|
||||||
}else {
|
} else {
|
||||||
g_queryInfo.subQueryInfo.threadCnt = 0;
|
g_queryInfo.subQueryInfo.threadCnt = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
||||||
pthread_join(pids[i], NULL);
|
pthread_join(pids[i], NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmfree((char*)pids);
|
tmfree((char*)pids);
|
||||||
tmfree((char*)infos);
|
tmfree((char*)infos);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) {
|
||||||
pthread_join(pidsOfSub[i], NULL);
|
pthread_join(pidsOfSub[i], NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmfree((char*)pidsOfSub);
|
tmfree((char*)pidsOfSub);
|
||||||
tmfree((char*)infosOfSub);
|
tmfree((char*)infosOfSub);
|
||||||
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
|
static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
|
||||||
if (res == NULL || taos_errno(res) != 0) {
|
if (res == NULL || taos_errno(res) != 0) {
|
||||||
printf("failed to subscribe result, code:%d, reason:%s\n", code, taos_errstr(res));
|
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
|
||||||
|
__func__, __LINE__, code, taos_errstr(res));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
getResult(res, (char*)param);
|
getResult(res, (char*)param);
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
|
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
|
||||||
TAOS_SUB* tsub = NULL;
|
TAOS_SUB* tsub = NULL;
|
||||||
|
|
||||||
if (g_queryInfo.superQueryInfo.subscribeMode) {
|
if (g_queryInfo.superQueryInfo.subscribeMode) {
|
||||||
tsub = taos_subscribe(taos,
|
tsub = taos_subscribe(taos,
|
||||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
topic, sql, subscribe_callback, (void*)resultFileName,
|
topic, sql, subscribe_callback, (void*)resultFileName,
|
||||||
g_queryInfo.superQueryInfo.subscribeInterval);
|
g_queryInfo.superQueryInfo.subscribeInterval);
|
||||||
} else {
|
} else {
|
||||||
tsub = taos_subscribe(taos,
|
tsub = taos_subscribe(taos,
|
||||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
topic, sql, NULL, NULL, 0);
|
topic, sql, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsub == NULL) {
|
if (tsub == NULL) {
|
||||||
printf("failed to create subscription. topic:%s, sql:%s\n", topic, sql);
|
printf("failed to create subscription. topic:%s, sql:%s\n", topic, sql);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsub;
|
return tsub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *subSubscribeProcess(void *sarg) {
|
static void *subSubscribeProcess(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
char subSqlstr[1024];
|
char subSqlstr[1024];
|
||||||
|
|
||||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||||
|
@ -5608,7 +5625,7 @@ static void *subSubscribeProcess(void *sarg) {
|
||||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
|
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
//int64_t st = 0;
|
//int64_t st = 0;
|
||||||
//int64_t et = 0;
|
//int64_t et = 0;
|
||||||
do {
|
do {
|
||||||
|
@ -5643,13 +5660,13 @@ static void *subSubscribeProcess(void *sarg) {
|
||||||
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.subQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.subQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d",
|
sprintf(tmpFile, "%s-%d",
|
||||||
g_queryInfo.subQueryInfo.result[i],
|
g_queryInfo.subQueryInfo.result[i],
|
||||||
winfo->threadID);
|
winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
|
@ -5657,16 +5674,16 @@ static void *subSubscribeProcess(void *sarg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.subQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i],
|
taos_unsubscribe(g_queryInfo.subQueryInfo.tsub[i],
|
||||||
g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
g_queryInfo.subQueryInfo.subscribeKeepProgress);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *superSubscribeProcess(void *sarg) {
|
static void *superSubscribeProcess(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
|
|
||||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||||
|
@ -5674,7 +5691,7 @@ static void *superSubscribeProcess(void *sarg) {
|
||||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
|
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
//int64_t st = 0;
|
//int64_t st = 0;
|
||||||
//int64_t et = 0;
|
//int64_t et = 0;
|
||||||
do {
|
do {
|
||||||
|
@ -5689,13 +5706,13 @@ static void *superSubscribeProcess(void *sarg) {
|
||||||
sprintf(topic, "taosdemo-subscribe-%d", i);
|
sprintf(topic, "taosdemo-subscribe-%d", i);
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d",
|
sprintf(tmpFile, "%s-%d",
|
||||||
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
g_queryInfo.superQueryInfo.tsub[i] =
|
g_queryInfo.superQueryInfo.tsub[i] =
|
||||||
subscribeImpl(winfo->taos,
|
subscribeImpl(winfo->taos,
|
||||||
g_queryInfo.superQueryInfo.sql[i],
|
g_queryInfo.superQueryInfo.sql[i],
|
||||||
topic, tmpFile);
|
topic, tmpFile);
|
||||||
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
|
if (NULL == g_queryInfo.superQueryInfo.tsub[i]) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -5711,12 +5728,12 @@ static void *superSubscribeProcess(void *sarg) {
|
||||||
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
res = taos_consume(g_queryInfo.superQueryInfo.tsub[i]);
|
||||||
if (res) {
|
if (res) {
|
||||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||||
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
||||||
sprintf(tmpFile, "%s-%d",
|
sprintf(tmpFile, "%s-%d",
|
||||||
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
||||||
}
|
}
|
||||||
getResult(res, tmpFile);
|
getResult(res, tmpFile);
|
||||||
|
@ -5726,7 +5743,7 @@ static void *superSubscribeProcess(void *sarg) {
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
|
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue