fix:[TD-32526] consumer null if add column because of pDataBlock not clear

This commit is contained in:
wangmm0220 2024-10-12 16:09:23 +08:00
parent c44c8ec106
commit 0c11795cde
4 changed files with 262 additions and 0 deletions

View File

@ -2877,6 +2877,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
return 0;
}
blockDataFreeRes((SSDataBlock*)pBlock);
code = calBlockTbName(pInfo, pInfo->pRes, 0);
QUERY_CHECK_CODE(code, lino, _end);

View File

@ -0,0 +1,53 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
tdSql.execute(f'create database if not exists db_5466')
tdSql.execute(f'use db_5466')
tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)')
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_td32526'%(buildPath)
# tdLog.info(cmdStr)
# os.system(cmdStr)
#
# tdSql.execute("drop topic db_5466_topic")
tdSql.execute(f'alter stable meters add column item_tags nchar(500)')
tdSql.execute(f'alter stable meters add column new_col nchar(100)')
tdSql.execute("create topic db_5466_topic as select * from db_5466.meters")
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-06 14:38:05.000',10.30000,219,0.31000, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', '1')")
tdLog.info(cmdStr)
os.system(cmdStr)
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -4,6 +4,7 @@ add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(tmq_ts5466 tmq_ts5466.c)
add_executable(tmq_td32526 tmq_td32526.c)
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
@ -62,6 +63,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td32526
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_taosx_ci
PUBLIC taos

200
utils/test/c/tmq_td32526.c Normal file
View File

@ -0,0 +1,200 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "g1");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "db_5466_topic");
return topic_list;
}
int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
int len = 0;
char split = ' ';
for (int i = 0; i < numFields; ++i) {
if (i > 0) {
str[len++] = split;
}
if (row[i] == NULL) {
len += sprintf(str + len, "%s", "NULL");
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
return len;
}
/**
* @brief print column name and values of each row
*
* @param res
* @return int
*/
static int printResult(TAOS_RES *res) {
TAOS_ROW row = NULL;
int32_t cnt = 0;
while ((row = taos_fetch_row(res))) {
int numFields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
char header[256] = {0};
int len = 0;
for (int i = 0; i < numFields; ++i) {
len += sprintf(header + len, "%s ", fields[i].name);
}
puts(header);
char temp[1024] = {0};
printRow(temp, row, fields, numFields);
if (cnt == 0){
ASSERT(strcmp(temp, "1538721485000 10.300000 219 0.310000 NULL NULL California.SanFrancisco 2") == 0);
}else if(cnt == 1){
ASSERT(strcmp(temp, "1538807885000 10.300000 219 0.310000 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa 1 California.SanFrancisco 2") == 0);
}
cnt++;
}
return 0;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) {
cnt++;
printResult(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list);
}