diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f4bf5fafd4..4bdc2c4740 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -158,11 +158,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colIdx, void* charsetCxt); int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo2* pBindInfos, void *charsetCxt); -int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt); +int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + void* charsetCxt); int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, - int32_t colIdx, int32_t rowNum, void *charsetCxt); + int32_t colIdx, int32_t rowNum, void* charsetCxt); +int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt); int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName, - TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt); + TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt); void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index 283573803e..eb96eadbd4 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -101,6 +101,7 @@ typedef struct { bool autoCreateTbl; SHashObj *pVgHash; SBindInfo2 *pBindInfo; + bool bindRowFormat; SStbInterlaceInfo siInfo; } SStmtSQLInfo2; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index c084c169e9..48f9e751cb 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1421,7 +1421,12 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { pStmt->exec.pCurrBlock = *pDataBlock; if (pStmt->sql.stbInterlaceMode) { taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol); - pStmt->exec.pCurrBlock->pData->aCol = NULL; + (*pDataBlock)->pData->aCol = NULL; + } + if (colIdx < -1) { + pStmt->sql.bindRowFormat = true; + taosArrayDestroy((*pDataBlock)->pData->aCol); + (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES); } } @@ -1449,10 +1454,21 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { if (pStmt->sql.stbInterlaceMode) { (*pDataBlock)->pData->flags = 0; code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, - pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt); + pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, + pStmt->taos->optionInfo.charsetCxt); } else { - code = - qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt); + if (colIdx == -1) { + if (pStmt->sql.bindRowFormat) { + tscError("can't mix bind row format and bind column format"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } + code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, + pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt); + } else { + code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf, + pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, + pStmt->taos->optionInfo.charsetCxt); + } } if (code) { @@ -1465,6 +1481,11 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) { STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } + if (pStmt->sql.bindRowFormat) { + tscError("can't mix bind row format and bind column format"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } + if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) { tscError("bind column index not in sequence"); STMT_ERR_RET(TSDB_CODE_APP_ERROR); @@ -1695,11 +1716,11 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { return pStmt->errCode; } - (void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex)); while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); } - (void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); + TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex)); if (pStmt->sql.stbInterlaceMode) { STMT_ERR_RET(stmtAddBatch2(pStmt)); @@ -1802,11 +1823,11 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } - (void)taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex)); while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); } - (void)taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); + TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex)); (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 638b964d10..6bae063124 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -958,8 +958,49 @@ TEST(stmt2Case, stmt2_insert_non_statndard) { "double,bool_col bool,binary_col binary(20),nchar_col nchar(20),varbinary_col varbinary(20),geometry_col " "geometry(200)) tags(int_tag int,long_tag bigint,double_tag double,bool_tag bool,binary_tag " "binary(20),nchar_tag nchar(20),varbinary_tag varbinary(20),geometry_tag geometry(200));"); + do_query(taos, "use stmt2_testdb_6"); - TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL}; + TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; + + // less cols and tags using stb + { + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + const char* sql = "INSERT INTO stmt2_testdb_6.? using stmt2_testdb_6.stb1 (int_tag)tags(1) (ts) VALUES (?)"; + int code = taos_stmt2_prepare(stmt, sql, 0); + checkError(stmt, code); + int total_affect_rows = 0; + + int t64_len[2] = {sizeof(int64_t), sizeof(int64_t)}; + int tag_i = 0; + int tag_l = sizeof(int); + int64_t ts[2] = {1591060628000, 1591060628100}; + for (int i = 0; i < 3; i++) { + ts[0] += 1000; + ts[1] += 1000; + + TAOS_STMT2_BIND tags1 = {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}; + TAOS_STMT2_BIND tags2 = {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}; + TAOS_STMT2_BIND params1 = {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}; + TAOS_STMT2_BIND params2 = {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}; + + TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2}; + TAOS_STMT2_BIND* paramv[2] = {¶ms1, ¶ms2}; + char* tbname[2] = {"tb1", "tb2"}; + TAOS_STMT2_BINDV bindv = {2, &tbname[0], NULL, ¶mv[0]}; + code = taos_stmt2_bind_param(stmt, &bindv, -1); + checkError(stmt, code); + + int affected_rows; + taos_stmt2_exec(stmt, &affected_rows); + total_affect_rows += affected_rows; + + checkError(stmt, code); + } + + ASSERT_EQ(total_affect_rows, 12); + taos_stmt2_close(stmt); + } // less cols and tags { @@ -985,7 +1026,7 @@ TEST(stmt2Case, stmt2_insert_non_statndard) { TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2}; TAOS_STMT2_BIND* paramv[2] = {¶ms1, ¶ms2}; - char* tbname[2] = {"tb1", "tb2"}; + char* tbname[2] = {"tb3", "tb4"}; TAOS_STMT2_BINDV bindv = {2, &tbname[0], &tagv[0], ¶mv[0]}; code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); @@ -1013,26 +1054,29 @@ TEST(stmt2Case, stmt2_insert_non_statndard) { int tag_l = sizeof(int); int tag_bl = 3; int64_t ts[2] = {1591060628000, 1591060628100}; + int64_t ts_2[2] = {1591060628800, 1591060628900}; int t64_len[2] = {sizeof(int64_t), sizeof(int64_t)}; int coli[2] = {1, 2}; + int coli_2[2] = {3, 4}; int ilen[2] = {sizeof(int), sizeof(int)}; int total_affect_rows = 0; for (int i = 0; i < 3; i++) { ts[0] += 1000; ts[1] += 1000; + ts_2[0] += 1000; + ts_2[1] += 1000; - TAOS_STMT2_BIND tags1[2] = {{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1}, - {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}}; - TAOS_STMT2_BIND tags2[2] = {{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1}, - {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}}; - TAOS_STMT2_BIND params1[2] = {{TSDB_DATA_TYPE_INT, &coli, &ilen[0], NULL, 2}, - {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}}; - TAOS_STMT2_BIND params2[2] = {{TSDB_DATA_TYPE_INT, &coli, &ilen[0], NULL, 2}, - {TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 2}}; + TAOS_STMT2_BIND tags[2][2] = { + {{TSDB_DATA_TYPE_BINARY, (void*)"abc", &tag_bl, NULL, 1}, {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}}, + {{TSDB_DATA_TYPE_BINARY, (void*)"def", &tag_bl, NULL, 1}, {TSDB_DATA_TYPE_INT, &tag_i, &tag_l, NULL, 1}}}; + TAOS_STMT2_BIND params[2][2] = { + {{TSDB_DATA_TYPE_INT, &coli[0], &ilen[0], NULL, 2}, {TSDB_DATA_TYPE_TIMESTAMP, &ts[0], &t64_len[0], NULL, 2}}, + {{TSDB_DATA_TYPE_INT, &coli_2[0], &ilen[0], NULL, 2}, + {TSDB_DATA_TYPE_TIMESTAMP, &ts_2[0], &t64_len[0], NULL, 2}}}; - TAOS_STMT2_BIND* tagv[2] = {&tags1[0], &tags2[0]}; - TAOS_STMT2_BIND* paramv[2] = {¶ms1[0], ¶ms2[0]}; - char* tbname[2] = {"tb3", "tb4"}; + TAOS_STMT2_BIND* tagv[2] = {&tags[0][0], &tags[1][0]}; + TAOS_STMT2_BIND* paramv[2] = {¶ms[0][0], ¶ms[1][0]}; + char* tbname[2] = {"tb5", "tb6"}; TAOS_STMT2_BINDV bindv = {2, &tbname[0], &tagv[0], ¶mv[0]}; code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); @@ -1894,4 +1938,158 @@ TEST(stmt2Case, async_order) { } } +TEST(stmt2Case, rowformat_bind) { + TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0); + ASSERT_NE(taos, nullptr); + + do_query(taos, "drop database if exists stmt2_testdb_16"); + do_query(taos, "create database IF NOT EXISTS stmt2_testdb_16"); + do_query( + taos, + "create stable stmt2_testdb_16.stb(ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(8), c6 " + "smallint, c7 " + "tinyint, c8 bool, c9 nchar(8), c10 geometry(256))TAGS(tts timestamp, t1 int, t2 bigint, t3 float, t4 double, t5 " + "binary(8), t6 smallint, t7 tinyint, t8 bool, t9 nchar(8), t10 geometry(256))"); + + TAOS_STMT2_OPTION option = {0}; + TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); + ASSERT_NE(stmt, nullptr); + int code = 0; + uintptr_t c10len = 0; + struct { + int64_t c1; + int32_t c2; + int64_t c3; + float c4; + double c5; + unsigned char c6[8]; + int16_t c7; + int8_t c8; + int8_t c9; + char c10[32]; + } v = {1591060628000, 1, 2, 3.0, 4.0, "abcdef", 5, 6, 7, "ijnop"}; + + struct { + int32_t c1; + int32_t c2; + int32_t c3; + int32_t c4; + int32_t c5; + int32_t c6; + int32_t c7; + int32_t c8; + int32_t c9; + int32_t c10; + } v_len = {sizeof(int64_t), sizeof(int32_t), + sizeof(int64_t), sizeof(float), + sizeof(double), 8, + sizeof(int16_t), sizeof(int8_t), + sizeof(int8_t), 8}; + TAOS_STMT2_BIND params[11]; + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].length = (int32_t*)&v_len.c1; + params[0].buffer = &v.c1; + params[0].is_null = NULL; + params[0].num = 1; + + params[1].buffer_type = TSDB_DATA_TYPE_INT; + params[1].buffer = &v.c2; + params[1].length = (int32_t*)&v_len.c2; + params[1].is_null = NULL; + params[1].num = 1; + + params[2].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[2].buffer = &v.c3; + params[2].length = (int32_t*)&v_len.c3; + params[2].is_null = NULL; + params[2].num = 1; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer = &v.c4; + params[3].length = (int32_t*)&v_len.c4; + params[3].is_null = NULL; + params[3].num = 1; + + params[4].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[4].buffer = &v.c5; + params[4].length = (int32_t*)&v_len.c5; + params[4].is_null = NULL; + params[4].num = 1; + + params[5].buffer_type = TSDB_DATA_TYPE_BINARY; + params[5].buffer = &v.c6; + params[5].length = (int32_t*)&v_len.c6; + params[5].is_null = NULL; + params[5].num = 1; + + params[6].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[6].buffer = &v.c7; + params[6].length = (int32_t*)&v_len.c7; + params[6].is_null = NULL; + params[6].num = 1; + + params[7].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[7].buffer = &v.c8; + params[7].length = (int32_t*)&v_len.c8; + params[7].is_null = NULL; + params[7].num = 1; + + params[8].buffer_type = TSDB_DATA_TYPE_BOOL; + params[8].buffer = &v.c9; + params[8].length = (int32_t*)&v_len.c9; + params[8].is_null = NULL; + params[8].num = 1; + + params[9].buffer_type = TSDB_DATA_TYPE_NCHAR; + params[9].buffer = &v.c10; + params[9].length = (int32_t*)&v_len.c10; + params[9].is_null = NULL; + params[9].num = 1; + + unsigned char* outputGeom1; + size_t size1; + initCtxMakePoint(); + code = doMakePoint(1.000, 2.000, &outputGeom1, &size1); + checkError(stmt, code); + params[10].buffer_type = TSDB_DATA_TYPE_GEOMETRY; + params[10].buffer = outputGeom1; + params[10].length = (int32_t*)&size1; + params[10].is_null = NULL; + params[10].num = 1; + + char* stmt_sql = "insert into stmt2_testdb_16.? using stb tags(?,?,?,?,?,?,?,?,?,?,?)values (?,?,?,?,?,?,?,?,?,?,?)"; + code = taos_stmt2_prepare(stmt, stmt_sql, 0); + checkError(stmt, code); + + char* tbname[1] = {"tb1"}; + TAOS_STMT2_BIND* tags = ¶ms[0]; + TAOS_STMT2_BIND* cols = ¶ms[0]; + TAOS_STMT2_BINDV bindv = {1, &tbname[0], &tags, &cols}; + code = taos_stmt2_bind_param(stmt, &bindv, -2); + checkError(stmt, code); + + int affected_rows; + code = taos_stmt2_exec(stmt, &affected_rows); + checkError(stmt, code); + ASSERT_EQ(affected_rows, 1); + + int64_t ts2 = 1591060628000; + params[0].buffer = &ts2; + code = taos_stmt2_bind_param(stmt, &bindv, -2); + checkError(stmt, code); + + code = taos_stmt2_exec(stmt, &affected_rows); + checkError(stmt, code); + ASSERT_EQ(affected_rows, 1); + + params[0].buffer = &ts2; + code = taos_stmt2_bind_param(stmt, &bindv, -1); + ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR); + + geosFreeBuffer(outputGeom1); + taos_stmt2_close(stmt); + do_query(taos, "drop database if exists stmt2_testdb_16"); + taos_close(taos); +} + #pragma GCC diagnostic pop diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index a02ca85a1f..c3ca29c607 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -3269,7 +3269,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte } if (!infoSorted) { - taosqsort_r(infos, numOfInfos, sizeof(SBindInfo), NULL, tBindInfoCompare); + taosqsort_r(infos, numOfInfos, sizeof(SBindInfo2), NULL, tBindInfoCompare); } int32_t code = 0; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 9f9077d1b6..29bb165de9 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -54,8 +54,12 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) { int32_t colNum = taosArrayGetSize(pNew->aCol); for (int32_t i = 0; i < colNum; ++i) { - SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i); - tColDataDeepClear(pCol); + if (pDataBlock->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i); + tColDataDeepClear(pCol); + } else { + pNew->aCol = taosArrayInit(20, POINTER_BYTES); + } } return TSDB_CODE_SUCCESS; @@ -324,7 +328,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind int16_t lastColId = -1; bool colInOrder = true; - if (NULL == *pTSchema) { + if (NULL == pTSchema || NULL == *pTSchema) { *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); } @@ -693,7 +697,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin bool colInOrder = true; int ncharColNums = 0; - if (NULL == *pTSchema) { + if (NULL == pTSchema || NULL == *pTSchema) { *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); } @@ -739,6 +743,22 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin goto _return; } pBindInfos[c].bind = taosArrayGetLast(ncharBinds); + } else if (TSDB_DATA_TYPE_GEOMETRY == pColSchema->type) { + code = initCtxAsText(); + if (code) { + qError("geometry init failed:%s", tstrerror(code)); + goto _return; + } + uint8_t* buf = bind[c].buffer; + for (int j = 0; j < bind[c].num; j++) { + code = checkWKB(buf, bind[c].length[j]); + if (code) { + qError("geometry data must be in WKB format"); + goto _return; + } + buf += bind[c].length[j]; + } + pBindInfos[c].bind = bind + c; } else { pBindInfos[c].bind = bind + c; } @@ -816,7 +836,8 @@ static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STM return TSDB_CODE_SUCCESS; } -int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void *charsetCxt) { +int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + void* charsetCxt) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; @@ -834,7 +855,7 @@ int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, goto _return; } - if(boundInfo->pColIndex[c]==0){ + if (boundInfo->pColIndex[c] == 0) { pCol->cflag |= COL_IS_KEY; } @@ -926,6 +947,94 @@ _return: return code; } +int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, + STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt) { + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + int32_t rowNum = bind->num; + TAOS_STMT2_BIND ncharBind = {0}; + TAOS_STMT2_BIND* pBind = NULL; + int32_t code = 0; + int16_t lastColId = -1; + bool colInOrder = true; + + if (NULL == pTSchema || NULL == *pTSchema) { + *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); + } + + for (int c = 0; c < boundInfo->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + if (pColSchema->colId <= lastColId) { + colInOrder = false; + } else { + lastColId = pColSchema->colId; + } + + if (bind[c].num != rowNum) { + code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + goto _return; + } + + if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && + bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type + code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + goto _return; + } + + if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { + code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind, charsetCxt); + if (code) { + goto _return; + } + pBindInfos[c].bind = &ncharBind; + } else if (TSDB_DATA_TYPE_GEOMETRY == pColSchema->type) { + code = initCtxAsText(); + if (code) { + qError("geometry init failed:%s", tstrerror(code)); + goto _return; + } + uint8_t *buf = bind[c].buffer; + for (int j = 0; j < bind[c].num; j++) { + code = checkWKB(buf, bind[c].length[j]); + if (code) { + qError("geometry data must be in WKB format"); + goto _return; + } + buf += bind[c].length[j]; + } + pBindInfos[c].bind = bind + c; + } else { + pBindInfos[c].bind = bind + c; + } + + pBindInfos[c].columnId = pColSchema->colId; + pBindInfos[c].type = pColSchema->type; + pBindInfos[c].bytes = pColSchema->bytes; + + if (code) { + goto _return; + } + } + + pDataBlock->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT; + if (pDataBlock->pData->pCreateTbReq != NULL) { + pDataBlock->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + } + + code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, + &pDataBlock->duplicateTs); + qDebug("stmt2 all %d columns bind %d rows data as row format", boundInfo->numOfBound, rowNum); + +_return: + + taosMemoryFree(ncharBind.buffer); + taosMemoryFree(ncharBind.length); + + return code; +} + int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, uint8_t timePrec) { if (fields != NULL) { @@ -1114,15 +1223,19 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) { int32_t colNum = taosArrayGetSize(pBlock->pData->aCol); for (int32_t i = 0; i < colNum; ++i) { - SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); - if (pCol == NULL) { - qError("qResetStmtDataBlock column is NULL"); - return terrno; - } - if (deepClear) { - tColDataDeepClear(pCol); + if (pBlock->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); + if (pCol == NULL) { + qError("qResetStmtDataBlock column is NULL"); + return terrno; + } + if (deepClear) { + tColDataDeepClear(pCol); + } else { + tColDataClear(pCol); + } } else { - tColDataClear(pCol); + pBlock->pData->aRowP = taosArrayInit(20, POINTER_BYTES); } }