Merge pull request #22605 from taosdata/feat/TD-18789

feat:support varbinary type
This commit is contained in:
Haojun Liao 2023-08-30 11:29:26 +08:00 committed by GitHub
commit b1aa86a290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 529 additions and 45 deletions

View File

@ -43,7 +43,7 @@ int main(int argc, char *argv[])
taos_free_result(result);
// create table
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))";
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10), varbin varbinary(16))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
@ -68,6 +68,7 @@ int main(int argc, char *argv[])
double f8;
char bin[40];
char blob[80];
int8_t varbin[16];
} v = {0};
int32_t boolLen = sizeof(int8_t);
@ -80,7 +81,7 @@ int main(int argc, char *argv[])
int32_t ncharLen = 30;
stmt = taos_stmt_init(taos);
TAOS_MULTI_BIND params[10];
TAOS_MULTI_BIND params[11];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
@ -152,9 +153,19 @@ int main(int argc, char *argv[])
params[9].is_null = NULL;
params[9].num = 1;
int8_t tmp[16] = {'a', 0, 1, 13, '1'};
int32_t vbinLen = 5;
memcpy(v.varbin, tmp, sizeof(v.varbin));
params[10].buffer_type = TSDB_DATA_TYPE_VARBINARY;
params[10].buffer_length = sizeof(v.varbin);
params[10].buffer = v.varbin;
params[10].length = &vbinLen;
params[10].is_null = NULL;
params[10].num = 1;
char is_null = 1;
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
@ -162,7 +173,7 @@ int main(int argc, char *argv[])
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 10; ++j) {
for (int j = 1; j < 11; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
@ -216,7 +227,7 @@ int main(int argc, char *argv[])
printf("expect two rows, but %d rows are fetched\n", rows);
}
taos_free_result(result);
// taos_free_result(result);
taos_stmt_close(stmt);
return 0;

View File

@ -280,7 +280,7 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code));
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
}
}

View File

@ -387,8 +387,18 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_VARBINARY:{
void* data = NULL;
uint32_t size = 0;
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if(taosAscii2Hex(row[i], charLen, &data, &size) < 0){
break;
}
memcpy(str + len, data, size);
len += size;
taosMemoryFree(data);
}break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);

View File

@ -522,9 +522,9 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
} else {
if(vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY){
buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
buf = taosMemoryCalloc(vAlterTbReq.nTagVal*2 + 2 + 3, 1);
}else{
buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 1, 1);
buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 3, 1);
}
dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
}

View File

@ -314,7 +314,7 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le
}
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_STR_DATA_TYPE(pPara1->resType.type)) {
if (TSDB_DATA_TYPE_VARBINARY == pPara1->resType.type || !IS_STR_DATA_TYPE(pPara1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -328,7 +328,7 @@ static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len
}
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!IS_STR_DATA_TYPE(pPara1->resType.type)) {
if (TSDB_DATA_TYPE_VARBINARY == pPara1->resType.type || !IS_STR_DATA_TYPE(pPara1->resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -1839,6 +1839,10 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (TSDB_DATA_TYPE_VARBINARY == ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}
@ -1867,6 +1871,10 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
for (int32_t i = 0; i < numOfParams; ++i) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, i);
uint8_t paraType = ((SExprNode*)pPara)->resType.type;
if (TSDB_DATA_TYPE_VARBINARY == paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_STR_DATA_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -1923,7 +1931,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
uint8_t para0Type = pPara0->resType.type;
uint8_t para1Type = pPara1->resType.type;
if (!IS_STR_DATA_TYPE(para0Type) || !IS_INTEGER_TYPE(para1Type)) {
if (TSDB_DATA_TYPE_VARBINARY == para0Type || !IS_STR_DATA_TYPE(para0Type) || !IS_INTEGER_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -1951,6 +1959,12 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// The number of parameters has been limited by the syntax definition
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t para0Type = pPara0->resType.type;
if (TSDB_DATA_TYPE_VARBINARY == para0Type) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// The function return type has been set during syntax parsing
uint8_t para2Type = pFunc->node.resType.type;
@ -2022,7 +2036,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_STR_DATA_TYPE(para1Type)) {
if (para1Type == TSDB_DATA_TYPE_VARBINARY || !IS_STR_DATA_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -2156,7 +2170,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SExprNode* pPara = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_VALUE != nodeType(pPara) || (!IS_VAR_DATA_TYPE(pPara->resType.type))) {
if (QUERY_NODE_VALUE != nodeType(pPara) || TSDB_DATA_TYPE_VARBINARY == pPara->resType.type || (!IS_VAR_DATA_TYPE(pPara->resType.type))) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}

View File

@ -635,7 +635,7 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d
CHECK_OUT_OF_MEM(func);
strcpy(func->functionName, "cast");
func->node.resType = dt;
if (TSDB_DATA_TYPE_VARCHAR == dt.type || TSDB_DATA_TYPE_GEOMETRY == dt.type) {
if (TSDB_DATA_TYPE_VARCHAR == dt.type || TSDB_DATA_TYPE_GEOMETRY == dt.type || TSDB_DATA_TYPE_VARBINARY == dt.type) {
func->node.resType.bytes = func->node.resType.bytes + VARSTR_HEADER_SIZE;
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;

View File

@ -1258,7 +1258,7 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
if(isHexChar) taosMemoryFree(data);
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
varDataSetLen(pVal->datum.p, size + VARSTR_HEADER_SIZE);
varDataSetLen(pVal->datum.p, size);
memcpy(varDataVal(pVal->datum.p), data, size);
if(isHexChar) taosMemoryFree(data);
break;

View File

@ -97,8 +97,16 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
pVal->node.resType.bytes = inputSize;
switch (pParam->buffer_type) {
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return TSDB_CODE_OUT_OF_MEMORY;
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
memcpy(varDataVal(pVal->datum.p), pParam->buffer, pVal->node.resType.bytes);
pVal->node.resType.bytes += VARSTR_HEADER_SIZE;
break;
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_GEOMETRY:
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {

View File

@ -1990,6 +1990,8 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
// todo refactor the convert
int32_t code = sclConvertValueToSclParam(var, &out, NULL);
if (code != TSDB_CODE_SUCCESS) {
colDataDestroy(out.columnData);
taosMemoryFree(out.columnData);
qError("convert value to type[%d] failed", type);
return code;
}
@ -4678,10 +4680,10 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p,
code = scalarCalculate(info->sclCtx.node, pList, &output);
taosArrayDestroy(pList);
FLT_ERR_RET(code);
*p = output.columnData;
FLT_ERR_RET(code);
if (output.numOfQualified == output.numOfRows) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else if (output.numOfQualified == 0) {

View File

@ -1193,6 +1193,7 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
ctx->code = sclExecFunction(node, ctx, &output);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -1241,10 +1242,12 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
SScalarParam output = {0};
ctx->code = sclExecLogic(node, ctx, &output);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
if (0 == output.numOfRows) {
sclFreeParam(&output);
return DEAL_RES_CONTINUE;
}
@ -1341,6 +1344,7 @@ EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) {
SScalarParam output = {0};
ctx->code = sclExecCaseWhen(node, ctx, &output);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -1403,11 +1407,13 @@ EDealRes sclWalkFunction(SNode *pNode, SScalarCtx *ctx) {
ctx->code = sclExecFunction(node, ctx, &output);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -1420,11 +1426,13 @@ EDealRes sclWalkLogic(SNode *pNode, SScalarCtx *ctx) {
ctx->code = sclExecLogic(node, ctx, &output);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
@ -1443,6 +1451,7 @@ EDealRes sclWalkOperator(SNode *pNode, SScalarCtx *ctx) {
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
sclFreeParam(&output);
return DEAL_RES_ERROR;
}

View File

@ -964,6 +964,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
}
break;
}
case TSDB_DATA_TYPE_VARBINARY:{
if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = TMIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
memcpy(varDataVal(output), varDataVal(input), len);
varDataSetLen(output, len);
}else{
code = TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
goto _end;
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
int32_t len;

View File

@ -360,7 +360,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sma_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml_TS-3724.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varbinary.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varbinary.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py

View File

@ -23,16 +23,16 @@ class TDTestCase:
if ret != 0:
tdLog.exit("varbinary_test ret != 0")
tdSql.execute(f" create database test")
tdSql.execute(f" use test ")
tdSql.execute(f" create stable stb (ts timestamp, c1 nchar(32), c2 varbinary(16), c3 float) tags (t1 int, t2 binary(8), t3 varbinary(8))")
tdSql.query(f"desc stb")
tdSql.checkRows(7)
tdSql.checkData(2, 1, 'VARBINARY')
tdSql.checkData(2, 2, 16)
tdSql.checkData(6, 1, 'VARBINARY')
tdSql.checkData(6, 2, 8)
# tdSql.execute(f" create database test")
# tdSql.execute(f" use test ")
# tdSql.execute(f" create stable stb (ts timestamp, c1 nchar(32), c2 varbinary(16), c3 float) tags (t1 int, t2 binary(8), t3 varbinary(8))")
#
# tdSql.query(f"desc stb")
# tdSql.checkRows(7)
# tdSql.checkData(2, 1, 'VARBINARY')
# tdSql.checkData(2, 2, 16)
# tdSql.checkData(6, 1, 'VARBINARY')
# tdSql.checkData(6, 2, 8)
# tdSql.execute(f" insert into tb1 using stb tags (1, 'tb1_bin1', 'vart1') values (now, 'nchar1', 'varc1', 0.3)")
# tdSql.execute(f" insert into tb1 values (now + 1s, 'nchar2', null, 0.4)")

View File

@ -409,7 +409,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
if(taosAscii2Hex(val, length, &tmp, &size) < 0){
break;
}
taosFprintfFile(pFile, "%s", tmp);
taosFprintfFile(pFile, "%s%s%s", quotationStr, tmp, quotationStr);
taosMemoryFree(tmp);
break;
}

View File

@ -32,7 +32,7 @@
break;\
}\
}
int varbinary_test() {
void varbinary_sql_test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists varbinary_db");
@ -68,6 +68,7 @@ int varbinary_test() {
}
rowIndex++;
}
taos_free_result(pRes);
pRes = taos_query(taos, "insert into tb1 using stb tags (1, 'tb1_bin1', 'vart1') values (now, 'nchar1', 'varc1', 0.3)");
taos_free_result(pRes);
@ -96,7 +97,6 @@ int varbinary_test() {
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// test error
pRes = taos_query(taos, "select * from tb1 where c2 >= 0x8de6");
ASSERT(taos_errno(pRes) != 0);
@ -146,13 +146,81 @@ int varbinary_test() {
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// pRes = taos_query(taos, "select * from stb where c2 contains 'ssd'");
// ASSERT(taos_errno(pRes) != 0);
// taos_free_result(pRes);
//
// pRes = taos_query(taos, "select * from stb where c2 contains 'ssd'");
// ASSERT(taos_errno(pRes) != 0);
// taos_free_result(pRes);
// math function test, not support
pRes = taos_query(taos, "select abs(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select floor(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// string function test, not support
pRes = taos_query(taos, "select length(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select ltrim(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select upper(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select to_json(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select TO_UNIXTIMESTAMP(c2) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(c2 as varchar(16)) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(c3 as varbinary(16)) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(c1 as varbinary(16)) from stb");
ASSERT(taos_errno(pRes) != 0);
taos_free_result(pRes);
// support first/last/last_row/count/hyperloglog/sample/tail/mode
pRes = taos_query(taos, "select first(c2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select count(c2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select sample(c2,2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select mode(c2) from stb");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x7462315F62696E31", size) == 0);
taosMemoryFree(data);
break;
}
taos_free_result(pRes);
int numRows = 0;
@ -191,12 +259,25 @@ int varbinary_test() {
ASSERT(numRows == 2);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from stb where c2 not between '\\x3e' and '\\x7F8290'");
GET_ROW_NUM
ASSERT(numRows == 3);
taos_free_result(pRes);
pRes = taos_query(taos, "select cast('1' as varbinary(8))");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x31", size) == 0);
taosMemoryFree(data);
}
taos_free_result(pRes);
pRes = taos_query(taos, "select ts,c2 from stb order by c2");
rowIndex = 0;
while ((row = taos_fetch_row(pRes)) != NULL) {
@ -237,17 +318,355 @@ int varbinary_test() {
rowIndex++;
}
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
printf("%s result %s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
taos_close(taos);
}
return code;
void varbinary_stmt_test(){
TAOS *taos;
TAOS_RES *result;
int code;
TAOS_STMT *stmt;
taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
ASSERT(0);
}
result = taos_query(taos, "drop database demo");
taos_free_result(result);
result = taos_query(taos, "create database demo");
code = taos_errno(result);
if (code != 0) {
printf("failed to create database, reason:%s\n", taos_errstr(result));
taos_free_result(result);
ASSERT(0);
}
taos_free_result(result);
result = taos_query(taos, "use demo");
taos_free_result(result);
// create table
const char* sql = "create table m1 (ts timestamp, b bool, varbin varbinary(16))";
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(result));
taos_free_result(result);
ASSERT(0);
}
taos_free_result(result);
struct {
int64_t ts;
int8_t b;
int8_t varbin[16];
} v = {0};
int32_t boolLen = sizeof(int8_t);
int32_t bintLen = sizeof(int64_t);
int32_t vbinLen = 5;
stmt = taos_stmt_init(taos);
TAOS_MULTI_BIND params[3];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = &bintLen;
params[0].is_null = NULL;
params[0].num = 1;
params[1].buffer_type = TSDB_DATA_TYPE_BOOL;
params[1].buffer_length = sizeof(v.b);
params[1].buffer = &v.b;
params[1].length = &boolLen;
params[1].is_null = NULL;
params[1].num = 1;
params[2].buffer_type = TSDB_DATA_TYPE_VARBINARY;
params[2].buffer_length = sizeof(v.varbin);
params[2].buffer = v.varbin;
params[2].length = &vbinLen;
params[2].is_null = NULL;
params[2].num = 1;
char is_null = 1;
sql = "insert into m1 values(?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
}
v.ts = 1591060628000;
for (int i = 0; i < 10; ++i) {
v.ts += 1;
for (int j = 1; j < 3; ++j) {
params[j].is_null = ((i == j) ? &is_null : 0);
}
v.b = (int8_t)i % 2;
for (int j = 0; j < vbinLen; ++j) {
v.varbin[j] = i + j;
}
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
}
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute insert statement.\n");
ASSERT(0);
}
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT varbin FROM m1 WHERE varbin = ?", 0);
for (int j = 0; j < vbinLen; ++j) {
v.varbin[j] = j;
}
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute select statement.\n");
ASSERT(0);
}
result = taos_stmt_use_result(stmt);
TAOS_ROW row;
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
ASSERT(strcmp(temp, "\\x0001020304") == 0);
}
ASSERT (rows == 1);
// taos_free_result(result);
taos_stmt_close(stmt);
// query the records
stmt = taos_stmt_init(taos);
taos_stmt_prepare(stmt, "SELECT varbin FROM m1 WHERE varbin = ?", 0);
char tmp[16] = "\\x090a0b0c0d";
vbinLen = strlen(tmp);
params[2].buffer_type = TSDB_DATA_TYPE_VARCHAR;
params[2].buffer_length = sizeof(v.varbin);
params[2].buffer = tmp;
params[2].length = &vbinLen;
taos_stmt_bind_param(stmt, params + 2);
if (taos_stmt_execute(stmt) != 0) {
printf("failed to execute select statement.\n");
ASSERT(0);
}
result = taos_stmt_use_result(stmt);
rows = 0;
num_fields = taos_num_fields(result);
fields = taos_fetch_fields(result);
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
ASSERT(strcmp(temp, "\\x090A0B0C0D") == 0);
}
ASSERT (rows == 1);
// taos_free_result(result);
taos_stmt_close(stmt);
taos_close(taos);
printf("%s result success\n", __FUNCTION__);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
void varbinary_tmq_test(){
// build database
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT(pConn != NULL);
TAOS_RES *pRes = taos_query(pConn, "drop database if exists abc");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c2 varbinary(16)) tags(t1 int, t2 varbinary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, '\\x3f89')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 'hello')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c2 varbinary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t3 varbinary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct0 set tag t2='894'");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 varbinary(8))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table tb1 add column c2 varbinary(8)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
// create topic
pRes = taos_query(pConn, "create topic topic_db with meta as database abc");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
// build consumer
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
int32_t code = tmq_subscribe(tmq, topic_list);
ASSERT(code == 0);
int32_t cnt = 0;
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
if (tmq_get_res_type(tmqmessage) == TMQ_RES_TABLE_META || tmq_get_res_type(tmqmessage) == TMQ_RES_METADATA) {
char* result = tmq_get_json_meta(tmqmessage);
// if (result) {
// printf("meta result: %s\n", result);
// }
switch (cnt) {
case 0:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":16,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t2\",\"type\":16,\"length\":8}]}") == 0);
break;
case 1:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":2,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t2\",\"type\":16,\"value\":\"\\\"\\\\x3F89\\\"\"}],\"createList\":[]}") == 0);
break;
case 2:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c2\",\"colType\":16,\"colLength\":64}") == 0);
break;
case 3:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t3\",\"colType\":16,\"colLength\":64}") == 0);
break;
case 4:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"alterType\":4,\"colName\":\"t2\",\"colValue\":\"\\\"\\\\x383934\\\"\",\"colValueNull\":false}") == 0);
break;
case 5:
ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":16,\"length\":8}],\"tags\":[]}") == 0);
break;
case 6:
ASSERT(strcmp(result, "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"tb1\",\"alterType\":5,\"colName\":\"c2\",\"colType\":16,\"colLength\":8}") == 0);
break;
default:
break;
}
cnt++;
tmq_free_json_meta(result);
}
taos_free_result(tmqmessage);
} else {
break;
}
}
code = tmq_consumer_close(tmq);
ASSERT(code == 0);
tmq_list_destroy(topic_list);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
ASSERT(0);
}
taos_free_result(pRes);
taos_close(pConn);
printf("%s result success\n", __FUNCTION__);
}
int main(int argc, char *argv[]) {
int ret = 0;
ret = varbinary_test();
ASSERT(!ret);
varbinary_tmq_test();
varbinary_stmt_test();
varbinary_sql_test();
return ret;
}