Merge remote-tracking branch 'origin/3.0' into enh/TD-30880

This commit is contained in:
Yihao Deng 2024-07-08 07:34:39 +00:00
commit ef46c0c545
7 changed files with 143 additions and 20 deletions

View File

@ -89,6 +89,7 @@ typedef struct SParseContext {
bool isView;
bool isAudit;
bool nodeOffline;
bool isStmtBind;
const char* svrVer;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos

View File

@ -283,6 +283,7 @@ typedef struct SRequestObj {
bool inRetry;
bool isSubReq;
bool inCallback;
bool isStmtBind; // is statement bind parameter
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
int64_t allocatorRefId;

View File

@ -206,6 +206,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
(*pRequest)->isStmtBind = false;
((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
@ -266,7 +267,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.isStmtBind = pRequest->isStmtBind};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);

View File

@ -72,6 +72,7 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) {
}
if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true;
pStmt->exec.pRequest->isStmtBind = true;
}
}
@ -830,6 +831,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT;
pStmt->reqid = reqid;
pStmt->errCode = TSDB_CODE_SUCCESS;
if (NULL != pOptions) {
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
@ -882,6 +884,10 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_DLOG_E("start to prepare");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt));
}
@ -953,6 +959,10 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_DLOG("start to set tbName: %s", tbName);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
int32_t insert = 0;
@ -999,8 +1009,18 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_DLOG_E("start to set tbTags");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
tscWarn("no tags bound in sql, will not bound tags");
return TSDB_CODE_SUCCESS;
}
if (pStmt->bInfo.inExecCache) {
return TSDB_CODE_SUCCESS;
}
@ -1021,6 +1041,10 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
}
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1039,6 +1063,10 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
}
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1150,8 +1178,13 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
@ -1307,6 +1340,10 @@ int stmtAddBatch(TAOS_STMT* stmt) {
STMT_DLOG_E("start to add batch");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
if (pStmt->sql.stbInterlaceMode) {
@ -1471,6 +1508,10 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_DLOG_E("start to exec");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
@ -1599,6 +1640,10 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get tag fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
@ -1637,6 +1682,10 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get col fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
@ -1674,6 +1723,10 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STMT_DLOG_E("start to get param num");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
@ -1706,6 +1759,10 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STMT_DLOG_E("start to get param");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}

View File

@ -30,6 +30,7 @@ typedef struct SInsertParseContext {
bool forceUpdate;
bool needTableTagVal;
bool needRequest; // whether or not request server
bool isStmtBind; // whether is stmt bind
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -1978,7 +1979,6 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
int32_t code = TSDB_CODE_SUCCESS;
@ -1996,7 +1996,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true;
pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
break;
@ -2007,8 +2007,8 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
break;
}
if (isParseBindParam) {
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values");
if (pCxt->isStmtBind) {
code = buildInvalidOperationMsg(&pCxt->msg, "stmt bind param does not support normal value in sql");
break;
}
@ -2025,7 +2025,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
}
}
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) {
@ -2035,7 +2035,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
}
}
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
*pGotRow = true;
}
@ -2410,6 +2410,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
}
if (TK_NK_QUESTION == pTbName->type) {
pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
}
@ -2443,6 +2444,13 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
pTbName->n = strlen(tbName);
}
if (pCxt->isStmtBind) {
if (TK_NK_ID == pTbName->type || (tbNameAfterDbName != NULL && *(tbNameAfterDbName + 1) != '?')) {
// In SQL statements, the table name has already been specified.
parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param", pCxt->pComCxt->requestId);
}
}
*pHasData = true;
return TSDB_CODE_SUCCESS;
}
@ -2936,7 +2944,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.missCache = false,
.usingDuplicateTable = false,
.needRequest = true,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false),
.isStmtBind = pCxt->isStmtBind};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) {

View File

@ -196,6 +196,51 @@ class TDTestCase:
conn.close()
raise err
def test_stmt_nornmal_value_error(self, conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt_error"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )",
)
conn.load_table_info("log")
stmt = conn.statement("insert into log values(NOW(),?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].bool(True)
params[2].tinyint(None)
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.bind_param(params)
stmt.execute()
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def run(self):
self.test_stmt_insert(self.conn())
@ -203,7 +248,16 @@ class TDTestCase:
self.test_stmt_insert_error(self.conn())
except Exception as error :
if str(error)=='[0x0200]: no mix usage for ? and values':
if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============')
else:
tdLog.exit("expect error(%s) not occured" % str(error))
try:
self.test_stmt_nornmal_value_error(self.conn())
except Exception as error :
if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============')
else:
tdLog.exit("expect error(%s) not occured" % str(error))

View File

@ -264,4 +264,3 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
printf("taos_query_a_fetch_row test finished.\n");
}