feat:support stmt/tmq json in varbinary type

This commit is contained in:
wangmm0220 2023-08-29 15:24:12 +08:00
parent 201a8f0918
commit 34c7832850
7 changed files with 409 additions and 26 deletions

View File

@ -43,7 +43,7 @@ int main(int argc, char *argv[])
taos_free_result(result);
// create table
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))";
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10), varbin varbinary(16))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
@ -68,6 +68,7 @@ int main(int argc, char *argv[])
double f8;
char bin[40];
char blob[80];
int8_t varbin[16];
} v = {0};
int32_t boolLen = sizeof(int8_t);
@ -80,7 +81,7 @@ int main(int argc, char *argv[])
int32_t ncharLen = 30;
stmt = taos_stmt_init(taos);
TAOS_MULTI_BIND params[10];
TAOS_MULTI_BIND params[11];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
@ -152,9 +153,19 @@ int main(int argc, char *argv[])
params[9].is_null = NULL;
params[9].num = 1;
int8_t tmp[16] = {'a', 0, 1, 13, '1'};
int32_t vbinLen = 5;
memcpy(v.varbin, tmp, sizeof(v.varbin));
params[10].buffer_type = TSDB_DATA_TYPE_VARBINARY;
params[10].buffer_length = sizeof(v.varbin);
params[10].buffer = v.varbin;
params[10].length = &vbinLen;
params[10].is_null = NULL;
params[10].num = 1;
char is_null = 1;
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
@ -162,7 +173,7 @@ int main(int argc, char *argv[])
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
for (int j = 1; j < 11; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
@ -187,7 +198,7 @@ int main(int argc, char *argv[])
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0);
taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE varbin > ? AND v2 < ?", 0);
v.v1 = 5;
v.v2 = 15;
taos_stmt_bind_param(stmt, params + 2);

View File

@ -280,7 +280,7 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code));
fprintf(stderr, "failed to seek to %lld, reason:%s", p->begin, tmq_err2str(code));
}
}

View File

@ -1260,7 +1260,7 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
if(isHexChar) taosMemoryFree(data);
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
varDataSetLen(pVal->datum.p, size + VARSTR_HEADER_SIZE);
varDataSetLen(pVal->datum.p, size);
memcpy(varDataVal(pVal->datum.p), data, size);
if(isHexChar) taosMemoryFree(data);
break;

View File

@ -97,8 +97,16 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
pVal->node.resType.bytes = inputSize;
switch (pParam->buffer_type) {
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return TSDB_CODE_OUT_OF_MEMORY;
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
memcpy(varDataVal(pVal->datum.p), pParam->buffer, pVal->node.resType.bytes);
pVal->node.resType.bytes += VARSTR_HEADER_SIZE;
break;
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_GEOMETRY:
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {

View File

@ -973,6 +973,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
code = TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
goto _end;
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;

View File

@ -23,16 +23,16 @@ class TDTestCase:
if ret != 0:
tdLog.exit("varbinary_test ret != 0")
tdSql.execute(f" create database test")
tdSql.execute(f" use test ")
tdSql.execute(f" create stable stb (ts timestamp, c1 nchar(32), c2 varbinary(16), c3 float) tags (t1 int, t2 binary(8), t3 varbinary(8))")
tdSql.query(f"desc stb")
tdSql.checkRows(7)
tdSql.checkData(2, 1, 'VARBINARY')
tdSql.checkData(2, 2, 16)
tdSql.checkData(6, 1, 'VARBINARY')
tdSql.checkData(6, 2, 8)
# tdSql.execute(f" create database test")
# tdSql.execute(f" use test ")
# tdSql.execute(f" create stable stb (ts timestamp, c1 nchar(32), c2 varbinary(16), c3 float) tags (t1 int, t2 binary(8), t3 varbinary(8))")
#
# tdSql.query(f"desc stb")
# tdSql.checkRows(7)
# tdSql.checkData(2, 1, 'VARBINARY')
# tdSql.checkData(2, 2, 16)
# tdSql.checkData(6, 1, 'VARBINARY')
# tdSql.checkData(6, 2, 8)
# tdSql.execute(f" insert into tb1 using stb tags (1, 'tb1_bin1', 'vart1') values (now, 'nchar1', 'varc1', 0.3)")
# tdSql.execute(f" insert into tb1 values (now + 1s, 'nchar2', null, 0.4)")

View File

@ -32,7 +32,7 @@
break;\
}\
}
int varbinary_test() {
void varbinary_sql_test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists varbinary_db");
@ -97,7 +97,6 @@ int varbinary_test() {
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// test error
pRes = taos_query(taos, "select * from tb1 where c2 >= 0x8de6");
ASSERT(taos_errno(pRes) != 0);
@ -208,8 +207,19 @@ int varbinary_test() {
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb");
ASSERT(taos_errno(pRes) == 0);
pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x7462315F62696E31", size) == 0);
taosMemoryFree(data);
break;
}
taos_free_result(pRes);
int numRows = 0;
@ -254,6 +264,20 @@ int varbinary_test() {
ASSERT(numRows == 3);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast('1' as varbinary(8))");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x31", size) == 0);
taosMemoryFree(data);
}
taos_free_result(pRes);
pRes = taos_query(taos, "select ts,c2 from stb order by c2");
rowIndex = 0;
while ((row = taos_fetch_row(pRes)) != NULL) {
@ -294,17 +318,356 @@ int varbinary_test() {
rowIndex++;
}
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
printf("%s result %s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
taos_close(taos);
}
return code;
void varbinary_stmt_test(){
TAOS *taos;
TAOS_RES *result;
int code;
TAOS_STMT *stmt;
taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
ASSERT(0);
}
result = taos_query(taos, "drop database demo");
taos_free_result(result);
result = taos_query(taos, "create database demo");
code = taos_errno(result);
if (code != 0) {
printf("failed to create database, reason:%s\n", taos_errstr(result));
taos_free_result(result);
ASSERT(0);
}
taos_free_result(result);
result = taos_query(taos, "use demo");
taos_free_result(result);
// create table
const char* sql = "create table m1 (ts timestamp, b bool, varbin varbinary(16))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(result));
taos_free_result(result);
ASSERT(0);
}
taos_free_result(result);
struct {
int64_t ts;
int8_t b;
int8_t varbin[16];
} v = {0};
int32_t boolLen = sizeof(int8_t);
int32_t bintLen = sizeof(int64_t);
int32_t vbinLen = 5;
stmt = taos_stmt_init(taos);
TAOS_MULTI_BIND params[3];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = &bintLen;
params[0].is_null = NULL;
params[0].num = 1;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = &boolLen;
params[1].is_null = NULL;
params[1].num = 1;
params[2].buffer_type = TSDB_DATA_TYPE_VARBINARY;
params[2].buffer_length = sizeof(v.varbin);
params[2].buffer = v.varbin;
params[2].length = &vbinLen;
params[2].is_null = NULL;
params[2].num = 1;
char is_null = 1;
sql = "insert into m1 values(?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
}
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 11; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
for (int j = 0; j < vbinLen; ++j) {
v.varbin[j] = i + j;
}
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute insert statement.\n");
ASSERT(0);
}
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT varbin FROM m1 WHERE varbin = ?", 0);
for (int j = 0; j < vbinLen; ++j) {
v.varbin[j] = j;
}
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute select statement.\n");
ASSERT(0);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
ASSERT(strcmp(temp, "\\x0001020304") == 0);
}
ASSERT (rows == 1);
taos_free_result(result);
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT varbin FROM m1 WHERE varbin = ?", 0);
char tmp[16] = "\\x090a0b0c0d";
vbinLen = strlen(tmp);
params[2].buffer_type = TSDB_DATA_TYPE_VARCHAR;
params[2].buffer_length = sizeof(v.varbin);
params[2].buffer = tmp;
params[2].length = &vbinLen;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute select statement.\n");
ASSERT(0);
}
result = taos_stmt_use_result(stmt);
rows = 0;
num_fields = taos_num_fields(result);
fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
ASSERT(strcmp(temp, "\\x090A0B0C0D") == 0);
}
ASSERT (rows == 1);
taos_free_result(result);
taos_stmt_close(stmt);
printf("%s result success\n", __FUNCTION__);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
void varbinary_tmq_test(){
// build database
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT(pConn != NULL);
TAOS_RES *pRes = taos_query(pConn, "drop database if exists abc");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c2 varbinary(16)) tags(t1 int, t2 varbinary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, '\\x3f89')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 'hello')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c2 varbinary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t3 varbinary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct0 set tag t2='894'");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 varbinary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table tb1 add column c2 varbinary(8)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
// create topic
pRes = taos_query(pConn, "create topic topic_db with meta as database abc");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
// build consumer
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
int32_t code = tmq_subscribe(tmq, topic_list);
ASSERT(code == 0);
int32_t cnt = 0;
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
if (tmq_get_res_type(tmqmessage) == TMQ_RES_TABLE_META || tmq_get_res_type(tmqmessage) == TMQ_RES_METADATA) {
char* result = tmq_get_json_meta(tmqmessage);
// if (result) {
// printf("meta result: %s\n", result);
// }
switch (cnt) {
case 0:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":16,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t2\",\"type\":16,\"length\":8}]}") == 0);
break;
case 1:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":2,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t2\",\"type\":16,\"value\":\"\\\"\\\\x3F89\\\"\"}],\"createList\":[]}") == 0);
break;
case 2:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c2\",\"colType\":16,\"colLength\":64}") == 0);
break;
case 3:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t3\",\"colType\":16,\"colLength\":64}") == 0);
break;
case 4:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"alterType\":4,\"colName\":\"t2\",\"colValue\":\"\\\"\\\\x383934\\\"\",\"colValueNull\":false}") == 0);
break;
case 5:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":16,\"length\":8}],\"tags\":[]}") == 0);
break;
case 6:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"alterType\":5,\"colName\":\"c2\",\"colType\":16,\"colLength\":8}") == 0);
break;
default:
break;
}
cnt++;
tmq_free_json_meta(result);
}
taos_free_result(tmqmessage);
} else {
break;
}
}
code = tmq_consumer_close(tmq);
ASSERT(code == 0);
tmq_list_destroy(topic_list);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
printf("%s result success\n", __FUNCTION__);
}
int main(int argc, char *argv[]) {
int ret = 0;
ret = varbinary_test();
ASSERT(!ret);
// varbinary_tmq_test();
// varbinary_stmt_test();
varbinary_sql_test();
return ret;
}