Merge pull request #26434 from xinjiempolde/enh/TD-21826-3.0

enh/td-21826-3.0 error handling for stmt
This commit is contained in:
Hongze Cheng 2024-07-08 10:21:14 +08:00 committed by GitHub
commit c8aa963417
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 143 additions and 20 deletions

View File

@ -89,6 +89,7 @@ typedef struct SParseContext {
bool isView; bool isView;
bool isAudit; bool isAudit;
bool nodeOffline; bool nodeOffline;
bool isStmtBind;
const char* svrVer; const char* svrVer;
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos

View File

@ -283,6 +283,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
bool isSubReq; bool isSubReq;
bool inCallback; bool inCallback;
bool isStmtBind; // is statement bind parameter
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
int64_t allocatorRefId; int64_t allocatorRefId;

View File

@ -206,6 +206,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql; (*pRequest)->validateOnly = validateSql;
(*pRequest)->isStmtBind = false;
((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param; ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
@ -266,7 +267,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.isStmtBind = pRequest->isStmtBind};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);

View File

@ -72,6 +72,7 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true; pStmt->exec.pRequest->syncQuery = true;
pStmt->exec.pRequest->isStmtBind = true;
} }
} }
@ -830,6 +831,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
pStmt->bInfo.needParse = true; pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT; pStmt->sql.status = STMT_INIT;
pStmt->reqid = reqid; pStmt->reqid = reqid;
pStmt->errCode = TSDB_CODE_SUCCESS;
if (NULL != pOptions) { if (NULL != pOptions) {
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
@ -882,6 +884,10 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
STMT_DLOG_E("start to prepare"); STMT_DLOG_E("start to prepare");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (pStmt->sql.status >= STMT_PREPARE) { if (pStmt->sql.status >= STMT_PREPARE) {
STMT_ERR_RET(stmtResetStmt(pStmt)); STMT_ERR_RET(stmtResetStmt(pStmt));
} }
@ -953,6 +959,10 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_DLOG("start to set tbName: %s", tbName); STMT_DLOG("start to set tbName: %s", tbName);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
int32_t insert = 0; int32_t insert = 0;
@ -999,8 +1009,18 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_DLOG_E("start to set tbTags"); STMT_DLOG_E("start to set tbTags");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
SBoundColInfo *tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
tscWarn("no tags bound in sql, will not bound tags");
return TSDB_CODE_SUCCESS;
}
if (pStmt->bInfo.inExecCache) { if (pStmt->bInfo.inExecCache) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1021,6 +1041,10 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
} }
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds"); tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1039,6 +1063,10 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
} }
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) { int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds"); tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
@ -1150,8 +1178,13 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx); STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false; pStmt->bInfo.needParse = false;
@ -1307,6 +1340,10 @@ int stmtAddBatch(TAOS_STMT* stmt) {
STMT_DLOG_E("start to add batch"); STMT_DLOG_E("start to add batch");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
if (pStmt->sql.stbInterlaceMode) { if (pStmt->sql.stbInterlaceMode) {
@ -1471,6 +1508,10 @@ int stmtExec(TAOS_STMT* stmt) {
STMT_DLOG_E("start to exec"); STMT_DLOG_E("start to exec");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
@ -1599,6 +1640,10 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get tag fields"); STMT_DLOG_E("start to get tag fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
@ -1637,6 +1682,10 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STMT_DLOG_E("start to get col fields"); STMT_DLOG_E("start to get col fields");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
@ -1674,6 +1723,10 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STMT_DLOG_E("start to get param num"); STMT_DLOG_E("start to get param num");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
@ -1706,6 +1759,10 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STMT_DLOG_E("start to get param"); STMT_DLOG_E("start to get param");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }

View File

@ -30,6 +30,7 @@ typedef struct SInsertParseContext {
bool forceUpdate; bool forceUpdate;
bool needTableTagVal; bool needTableTagVal;
bool needRequest; // whether or not request server bool needRequest; // whether or not request server
bool isStmtBind; // whether is stmt bind
} SInsertParseContext; } SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -1978,7 +1979,6 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) { SToken* pToken) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo; SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1996,7 +1996,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]); SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
if (pToken->type == TK_NK_QUESTION) { if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true; pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) { if (NULL == pCxt->pComCxt->pStmtCb) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
break; break;
@ -2007,8 +2007,8 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
break; break;
} }
if (isParseBindParam) { if (pCxt->isStmtBind) {
code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and values"); code = buildInvalidOperationMsg(&pCxt->msg, "stmt bind param does not support normal value in sql");
break; break;
} }
@ -2025,7 +2025,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
} }
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -2035,7 +2035,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
} }
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) { if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
*pGotRow = true; *pGotRow = true;
} }
@ -2410,6 +2410,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
} }
if (TK_NK_QUESTION == pTbName->type) { if (TK_NK_QUESTION == pTbName->type) {
pCxt->isStmtBind = true;
if (NULL == pCxt->pComCxt->pStmtCb) { if (NULL == pCxt->pComCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z); return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
} }
@ -2443,6 +2444,13 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
pTbName->n = strlen(tbName); pTbName->n = strlen(tbName);
} }
if (pCxt->isStmtBind) {
if (TK_NK_ID == pTbName->type || (tbNameAfterDbName != NULL && *(tbNameAfterDbName + 1) != '?')) {
// In SQL statements, the table name has already been specified.
parserWarn("0x%" PRIx64 " table name is specified in sql, ignore the table name in bind param", pCxt->pComCxt->requestId);
}
}
*pHasData = true; *pHasData = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2936,7 +2944,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
.missCache = false, .missCache = false,
.usingDuplicateTable = false, .usingDuplicateTable = false,
.needRequest = true, .needRequest = true,
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false),
.isStmtBind = pCxt->isStmtBind};
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -196,6 +196,51 @@ class TDTestCase:
conn.close() conn.close()
raise err raise err
def test_stmt_nornmal_value_error(self, conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt_error"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , error_data int )",
)
conn.load_table_info("log")
stmt = conn.statement("insert into log values(NOW(),?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].bool(True)
params[2].tinyint(None)
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.bind_param(params)
stmt.execute()
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def run(self): def run(self):
self.test_stmt_insert(self.conn()) self.test_stmt_insert(self.conn())
@ -203,7 +248,16 @@ class TDTestCase:
self.test_stmt_insert_error(self.conn()) self.test_stmt_insert_error(self.conn())
except Exception as error : except Exception as error :
if str(error)=='[0x0200]: no mix usage for ? and values': if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============')
else:
tdLog.exit("expect error(%s) not occured" % str(error))
try:
self.test_stmt_nornmal_value_error(self.conn())
except Exception as error :
if str(error)=='[0x0200]: stmt bind param does not support normal value in sql':
tdLog.info('=========stmt error occured for bind part column ==============') tdLog.info('=========stmt error occured for bind part column ==============')
else: else:
tdLog.exit("expect error(%s) not occured" % str(error)) tdLog.exit("expect error(%s) not occured" % str(error))

View File

@ -30,7 +30,7 @@
#include "taos.h" #include "taos.h"
class taoscTest : public ::testing::Test { class taoscTest : public ::testing::Test {
protected: protected:
static void SetUpTestCase() { static void SetUpTestCase() {
printf("start test setup.\n"); printf("start test setup.\n");
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -254,7 +254,7 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
printf("taos_query_a_fetch_row taos_fetch_row start...\n"); printf("taos_query_a_fetch_row taos_fetch_row start...\n");
while ((row = taos_fetch_row(*pres))) { while ((row = taos_fetch_row(*pres))) {
getRecordCounts++; getRecordCounts++;
} }
printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts); printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts);
taos_free_result(*pres); taos_free_result(*pres);
@ -264,4 +264,3 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
printf("taos_query_a_fetch_row test finished.\n"); printf("taos_query_a_fetch_row test finished.\n");
} }