add row format bind control argument

This commit is contained in:
pengrongkun94@qq.com 2025-02-26 20:04:33 +08:00
parent 8cf363fbf3
commit 10f3a388f2
5 changed files with 295 additions and 63 deletions

View File

@ -159,11 +159,13 @@ int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colId
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt);
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt);
void* charsetCxt);
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
int32_t colIdx, int32_t rowNum, void *charsetCxt);
int32_t colIdx, int32_t rowNum, void* charsetCxt);
int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt);
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt);
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,

View File

@ -101,6 +101,7 @@ typedef struct {
bool autoCreateTbl;
SHashObj *pVgHash;
SBindInfo2 *pBindInfo;
bool bindRowFormat;
SStbInterlaceInfo siInfo;
} SStmtSQLInfo2;

View File

@ -1416,7 +1416,12 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
pStmt->exec.pCurrBlock = *pDataBlock;
if (pStmt->sql.stbInterlaceMode) {
taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
pStmt->exec.pCurrBlock->pData->aCol = NULL;
(*pDataBlock)->pData->aCol = NULL;
}
if (colIdx < -1) {
pStmt->sql.bindRowFormat = true;
taosArrayDestroy((*pDataBlock)->pData->aCol);
(*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
}
}
@ -1447,11 +1452,18 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
pStmt->taos->optionInfo.charsetCxt);
} else {
taosArrayDestroy(pCols);
(*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
code =
qBindStmtColsValue2(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
&pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
if (colIdx == -1) {
if (pStmt->sql.bindRowFormat) {
tscError("can't mix bind row format and bind column format");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
} else {
code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
pStmt->taos->optionInfo.charsetCxt);
}
}
if (code) {
@ -1464,6 +1476,11 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
if (pStmt->sql.bindRowFormat) {
tscError("can't mix bind row format and bind column format");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
tscError("bind column index not in sequence");
STMT_ERR_RET(TSDB_CODE_APP_ERROR);

View File

@ -1913,4 +1913,158 @@ TEST(stmt2Case, async_order) {
}
taosMemoryFree(tbs);
}
TEST(stmt2Case, rowformat_bind) {
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);
ASSERT_NE(taos, nullptr);
do_query(taos, "drop database if exists stmt2_testdb_16");
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_16");
do_query(
taos,
"create stable stmt2_testdb_16.stb(ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(8), c6 "
"smallint, c7 "
"tinyint, c8 bool, c9 nchar(8), c10 geometry(256))TAGS(tts timestamp, t1 int, t2 bigint, t3 float, t4 double, t5 "
"binary(8), t6 smallint, t7 tinyint, t8 bool, t9 nchar(8), t10 geometry(256))");
TAOS_STMT2_OPTION option = {0};
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
int code = 0;
uintptr_t c10len = 0;
struct {
int64_t c1;
int32_t c2;
int64_t c3;
float c4;
double c5;
unsigned char c6[8];
int16_t c7;
int8_t c8;
int8_t c9;
char c10[32];
} v = {1591060628000, 1, 2, 3.0, 4.0, "abcdef", 5, 6, 7, "ijnop"};
struct {
int32_t c1;
int32_t c2;
int32_t c3;
int32_t c4;
int32_t c5;
int32_t c6;
int32_t c7;
int32_t c8;
int32_t c9;
int32_t c10;
} v_len = {sizeof(int64_t), sizeof(int32_t),
sizeof(int64_t), sizeof(float),
sizeof(double), 8,
sizeof(int16_t), sizeof(int8_t),
sizeof(int8_t), 8};
TAOS_STMT2_BIND params[11];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].length = (int32_t*)&v_len.c1;
params[0].buffer = &v.c1;
params[0].is_null = NULL;
params[0].num = 1;
params[1].buffer_type = TSDB_DATA_TYPE_INT;
params[1].buffer = &v.c2;
params[1].length = (int32_t*)&v_len.c2;
params[1].is_null = NULL;
params[1].num = 1;
params[2].buffer_type = TSDB_DATA_TYPE_BIGINT;
params[2].buffer = &v.c3;
params[2].length = (int32_t*)&v_len.c3;
params[2].is_null = NULL;
params[2].num = 1;
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer = &v.c4;
params[3].length = (int32_t*)&v_len.c4;
params[3].is_null = NULL;
params[3].num = 1;
params[4].buffer_type = TSDB_DATA_TYPE_DOUBLE;
params[4].buffer = &v.c5;
params[4].length = (int32_t*)&v_len.c5;
params[4].is_null = NULL;
params[4].num = 1;
params[5].buffer_type = TSDB_DATA_TYPE_BINARY;
params[5].buffer = &v.c6;
params[5].length = (int32_t*)&v_len.c6;
params[5].is_null = NULL;
params[5].num = 1;
params[6].buffer_type = TSDB_DATA_TYPE_SMALLINT;
params[6].buffer = &v.c7;
params[6].length = (int32_t*)&v_len.c7;
params[6].is_null = NULL;
params[6].num = 1;
params[7].buffer_type = TSDB_DATA_TYPE_TINYINT;
params[7].buffer = &v.c8;
params[7].length = (int32_t*)&v_len.c8;
params[7].is_null = NULL;
params[7].num = 1;
params[8].buffer_type = TSDB_DATA_TYPE_BOOL;
params[8].buffer = &v.c9;
params[8].length = (int32_t*)&v_len.c9;
params[8].is_null = NULL;
params[8].num = 1;
params[9].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[9].buffer = &v.c10;
params[9].length = (int32_t*)&v_len.c10;
params[9].is_null = NULL;
params[9].num = 1;
unsigned char* outputGeom1;
size_t size1;
initCtxMakePoint();
code = doMakePoint(1.000, 2.000, &outputGeom1, &size1);
checkError(stmt, code);
params[10].buffer_type = TSDB_DATA_TYPE_GEOMETRY;
params[10].buffer = outputGeom1;
params[10].length = (int32_t*)&size1;
params[10].is_null = NULL;
params[10].num = 1;
char* stmt_sql = "insert into stmt2_testdb_16.? using stb tags(?,?,?,?,?,?,?,?,?,?,?)values (?,?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt2_prepare(stmt, stmt_sql, 0);
checkError(stmt, code);
char* tbname[1] = {"tb1"};
TAOS_STMT2_BIND* tags = &params[0];
TAOS_STMT2_BIND* cols = &params[0];
TAOS_STMT2_BINDV bindv = {1, &tbname[0], &tags, &cols};
code = taos_stmt2_bind_param(stmt, &bindv, -2);
checkError(stmt, code);
int affected_rows;
code = taos_stmt2_exec(stmt, &affected_rows);
checkError(stmt, code);
ASSERT_EQ(affected_rows, 1);
int64_t ts2 = 1591060628000;
params[0].buffer = &ts2;
code = taos_stmt2_bind_param(stmt, &bindv, -2);
checkError(stmt, code);
code = taos_stmt2_exec(stmt, &affected_rows);
checkError(stmt, code);
ASSERT_EQ(affected_rows, 1);
params[0].buffer = &ts2;
code = taos_stmt2_bind_param(stmt, &bindv, -1);
ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR);
geosFreeBuffer(outputGeom1);
taos_stmt2_close(stmt);
do_query(taos, "drop database if exists stmt2_testdb_16");
taos_close(taos);
}
#pragma GCC diagnostic pop

View File

@ -837,6 +837,117 @@ static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STM
}
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
void* charsetCxt) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num;
TAOS_STMT2_BIND ncharBind = {0};
TAOS_STMT2_BIND* pBind = NULL;
int32_t code = 0;
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
SColData* pCol = taosArrayGet(pCols, c);
if (pCol == NULL || pColSchema == NULL) {
code = buildInvalidOperationMsg(&pBuf, "get column schema or column data failed");
goto _return;
}
if (boundInfo->pColIndex[c] == 0) {
pCol->cflag |= COL_IS_KEY;
}
if (bind[c].num != rowNum) {
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind, charsetCxt);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind + c;
}
code = tColDataAddValueByBind2(pCol, pBind,
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1,
initCtxAsText, checkWKB);
if (code) {
goto _return;
}
}
qDebug("stmt2 all %d columns bind %d rows data as col format", boundInfo->numOfBound, rowNum);
_return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
}
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
int32_t colIdx, int32_t rowNum, void *charsetCxt) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
SColData* pCol = taosArrayGet(pCols, colIdx);
TAOS_STMT2_BIND ncharBind = {0};
TAOS_STMT2_BIND* pBind = NULL;
int32_t code = 0;
if (bind->num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
}
// Column index exceeds the number of columns
if (colIdx >= pCols->size && pCol == NULL) {
return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
}
if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol2(&pBuf, pColSchema, bind, &ncharBind, charsetCxt);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind;
}
code = tColDataAddValueByBind2(pCol, pBind,
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1,
initCtxAsText, checkWKB);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
_return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
}
int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
@ -861,10 +972,6 @@ int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind,
lastColId = pColSchema->colId;
}
if(boundInfo->pColIndex[c]==0){
pCol->cflag |= COL_IS_KEY;
}
if (bind[c].num != rowNum) {
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
@ -918,56 +1025,7 @@ int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind,
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered,
&pDataBlock->duplicateTs);
qDebug("stmt2 all %d columns bind %d rows data as col format", boundInfo->numOfBound, rowNum);
_return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
}
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
int32_t colIdx, int32_t rowNum, void *charsetCxt) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
SColData* pCol = taosArrayGet(pCols, colIdx);
TAOS_STMT2_BIND ncharBind = {0};
TAOS_STMT2_BIND* pBind = NULL;
int32_t code = 0;
if (bind->num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
}
// Column index exceeds the number of columns
if (colIdx >= pCols->size && pCol == NULL) {
return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
}
if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol2(&pBuf, pColSchema, bind, &ncharBind, charsetCxt);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind;
}
code = tColDataAddValueByBind2(pCol, pBind,
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1,
initCtxAsText, checkWKB);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
qDebug("stmt2 all %d columns bind %d rows data as row format", boundInfo->numOfBound, rowNum);
_return: