Merge pull request #29824 from taosdata/enh/3.0/TD-32645
enh(stmt)[td-32645] add api taos_stmt2_bind_param_a
This commit is contained in:
commit
ba210659d1
|
@ -247,11 +247,13 @@ typedef struct TAOS_STMT2_BINDV {
|
||||||
DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
|
DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
|
||||||
DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
|
DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
|
||||||
DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
|
DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
|
||||||
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
|
DLL_EXPORT int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
|
||||||
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
|
void *param);
|
||||||
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
|
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
|
||||||
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
|
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
|
||||||
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
|
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
|
||||||
|
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
|
||||||
|
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
|
||||||
DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
|
DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
|
||||||
DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt);
|
DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt);
|
||||||
|
|
||||||
|
|
|
@ -300,6 +300,7 @@ int32_t cleanupTaskQueue();
|
||||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||||
int32_t taosAsyncWait();
|
int32_t taosAsyncWait();
|
||||||
int32_t taosAsyncRecover();
|
int32_t taosAsyncRecover();
|
||||||
|
int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam);
|
||||||
|
|
||||||
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,19 @@ SStmtQNode* tail;
|
||||||
uint64_t qRemainNum;
|
uint64_t qRemainNum;
|
||||||
} SStmtQueue;
|
} SStmtQueue;
|
||||||
*/
|
*/
|
||||||
|
typedef struct {
|
||||||
|
TAOS_STMT2 *stmt;
|
||||||
|
TAOS_STMT2_BINDV *bindv;
|
||||||
|
int32_t col_idx;
|
||||||
|
__taos_async_fn_t fp;
|
||||||
|
void *param;
|
||||||
|
} ThreadArgs;
|
||||||
|
|
||||||
|
typedef struct AsyncBindParam {
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
TdThreadCond waitCond;
|
||||||
|
uint8_t asyncBindNum;
|
||||||
|
} AsyncBindParam;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STscObj *taos;
|
STscObj *taos;
|
||||||
|
@ -150,12 +163,13 @@ typedef struct {
|
||||||
SStmtExecInfo exec;
|
SStmtExecInfo exec;
|
||||||
SStmtBindInfo bInfo;
|
SStmtBindInfo bInfo;
|
||||||
|
|
||||||
char *db;
|
char *db;
|
||||||
int64_t reqid;
|
int64_t reqid;
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
tsem_t asyncQuerySem;
|
tsem_t asyncExecSem;
|
||||||
bool semWaited;
|
bool execSemWaited;
|
||||||
SStmtStatInfo stat;
|
AsyncBindParam asyncBindParam;
|
||||||
|
SStmtStatInfo stat;
|
||||||
} STscStmt2;
|
} STscStmt2;
|
||||||
/*
|
/*
|
||||||
extern char *gStmtStatusStr[];
|
extern char *gStmtStatusStr[];
|
||||||
|
@ -226,6 +240,8 @@ int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums);
|
||||||
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);
|
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);
|
||||||
TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt);
|
TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt);
|
||||||
const char *stmtErrstr2(TAOS_STMT2 *stmt);
|
const char *stmtErrstr2(TAOS_STMT2 *stmt);
|
||||||
|
int stmt2AsyncBind(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
|
||||||
|
int stmtAsyncBindThreadFunc(void *args);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -2168,11 +2168,16 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
|
||||||
}
|
}
|
||||||
|
|
||||||
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
||||||
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
if( atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum)>1) {
|
||||||
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
|
tscError("async bind param is still working, please try again later");
|
||||||
tscError("wait async query sem failed");
|
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
|
||||||
|
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
|
||||||
|
tscError("wait asyncExecSem failed");
|
||||||
}
|
}
|
||||||
pStmt->semWaited = true;
|
pStmt->execSemWaited = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
|
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
|
||||||
|
@ -2245,6 +2250,35 @@ out:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
|
||||||
|
void *param) {
|
||||||
|
if (stmt == NULL || bindv == NULL || fp == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
||||||
|
if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||||
|
tscError("async bind param is still working, please try again later");
|
||||||
|
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs));
|
||||||
|
args->stmt = stmt;
|
||||||
|
args->bindv = bindv;
|
||||||
|
args->col_idx = col_idx;
|
||||||
|
args->fp = fp;
|
||||||
|
args->param = param;
|
||||||
|
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||||
|
int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
|
||||||
|
if (code_s != TSDB_CODE_SUCCESS) {
|
||||||
|
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||||
|
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code_s;
|
||||||
|
}
|
||||||
|
|
||||||
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
|
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
|
||||||
if (stmt == NULL) {
|
if (stmt == NULL) {
|
||||||
tscError("NULL parameter for %s", __FUNCTION__);
|
tscError("NULL parameter for %s", __FUNCTION__);
|
||||||
|
|
|
@ -752,6 +752,14 @@ static int32_t stmtInitQueue(STscStmt2* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
|
||||||
|
(void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
|
||||||
|
(void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
|
||||||
|
pStmt->asyncBindParam.asyncBindNum = 0;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
|
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
|
||||||
pTblBuf->buffUnit = sizeof(SStmtQNode);
|
pTblBuf->buffUnit = sizeof(SStmtQNode);
|
||||||
pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
|
pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
|
||||||
|
@ -812,13 +820,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
||||||
pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||||
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
||||||
if (NULL == pStmt->sql.siInfo.pTableHash) {
|
if (NULL == pStmt->sql.siInfo.pTableHash) {
|
||||||
(void)stmtClose(pStmt);
|
(void)stmtClose2(pStmt);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
|
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
|
||||||
if (NULL == pStmt->sql.siInfo.pTableCols) {
|
if (NULL == pStmt->sql.siInfo.pTableCols) {
|
||||||
terrno = terrno;
|
terrno = terrno;
|
||||||
(void)stmtClose(pStmt);
|
(void)stmtClose2(pStmt);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,20 +839,27 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
(void)stmtClose(pStmt);
|
(void)stmtClose2(pStmt);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pStmt->sql.siInfo.tableColsReady = true;
|
pStmt->sql.siInfo.tableColsReady = true;
|
||||||
if (pStmt->options.asyncExecFn) {
|
if (pStmt->options.asyncExecFn) {
|
||||||
if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) {
|
if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
(void)stmtClose(pStmt);
|
(void)stmtClose2(pStmt);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pStmt->semWaited = false;
|
code = stmtIniAsyncBind(pStmt);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
terrno = code;
|
||||||
|
(void)stmtClose2(pStmt);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->execSemWaited = false;
|
||||||
|
|
||||||
STMT_LOG_SEQ(STMT_INIT);
|
STMT_LOG_SEQ(STMT_INIT);
|
||||||
|
|
||||||
|
@ -1659,8 +1674,8 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
|
||||||
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||||
++pStmt->sql.runTimes;
|
++pStmt->sql.runTimes;
|
||||||
|
|
||||||
if (tsem_post(&pStmt->asyncQuerySem) != 0) {
|
if (tsem_post(&pStmt->asyncExecSem) != 0) {
|
||||||
tscError("failed to post asyncQuerySem");
|
tscError("failed to post asyncExecSem");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1675,6 +1690,12 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
||||||
return pStmt->errCode;
|
return pStmt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||||
|
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||||
|
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||||
|
|
||||||
if (pStmt->sql.stbInterlaceMode) {
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
||||||
}
|
}
|
||||||
|
@ -1749,7 +1770,7 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
||||||
pRequest->body.queryFp = asyncQueryCb;
|
pRequest->body.queryFp = asyncQueryCb;
|
||||||
((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
|
((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
|
||||||
|
|
||||||
pStmt->semWaited = false;
|
pStmt->execSemWaited = false;
|
||||||
launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
|
launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1775,12 +1796,20 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
pStmt->bindThreadInUse = false;
|
pStmt->bindThreadInUse = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||||
|
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||||
|
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||||
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
||||||
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
||||||
|
|
||||||
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
(void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
|
||||||
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
|
(void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
|
||||||
tscError("failed to wait asyncQuerySem");
|
|
||||||
|
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
|
||||||
|
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
|
||||||
|
tscError("failed to wait asyncExecSem");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1798,8 +1827,8 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
|
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
|
||||||
|
|
||||||
if (pStmt->options.asyncExecFn) {
|
if (pStmt->options.asyncExecFn) {
|
||||||
if (tsem_destroy(&pStmt->asyncQuerySem) != 0) {
|
if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
|
||||||
tscError("failed to destroy asyncQuerySem");
|
tscError("failed to destroy asyncExecSem");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(stmt);
|
taosMemoryFree(stmt);
|
||||||
|
@ -1928,3 +1957,22 @@ TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
|
||||||
|
|
||||||
return pStmt->exec.pRequest;
|
return pStmt->exec.pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t stmtAsyncBindThreadFunc(void* args) {
|
||||||
|
qInfo("async stmt bind thread started");
|
||||||
|
|
||||||
|
ThreadArgs* targs = (ThreadArgs*)args;
|
||||||
|
STscStmt2* pStmt = (STscStmt2*)targs->stmt;
|
||||||
|
|
||||||
|
int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
|
||||||
|
targs->fp(targs->param, NULL, code);
|
||||||
|
(void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
|
||||||
|
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||||
|
(void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
|
||||||
|
(void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
|
||||||
|
taosMemoryFree(args);
|
||||||
|
|
||||||
|
qInfo("async stmt bind thread stopped");
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -37,7 +37,7 @@ namespace {
|
||||||
void checkError(TAOS_STMT2* stmt, int code) {
|
void checkError(TAOS_STMT2* stmt, int code) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
STscStmt2* pStmt = (STscStmt2*)stmt;
|
STscStmt2* pStmt = (STscStmt2*)stmt;
|
||||||
if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr) {
|
if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr || pStmt->exec.pRequest == nullptr) {
|
||||||
printf("stmt api error\n stats : %d\n errstr : %s\n", pStmt->sql.status, taos_stmt_errstr(stmt));
|
printf("stmt api error\n stats : %d\n errstr : %s\n", pStmt->sql.status, taos_stmt_errstr(stmt));
|
||||||
} else {
|
} else {
|
||||||
printf("stmt api error\n sql : %s\n stats : %d\n errstr : %s\n", pStmt->sql.sqlStr, pStmt->sql.status,
|
printf("stmt api error\n sql : %s\n stats : %d\n errstr : %s\n", pStmt->sql.sqlStr, pStmt->sql.status,
|
||||||
|
@ -255,48 +255,41 @@ TEST(stmt2Case, stmt2_test_limit) {
|
||||||
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7");
|
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7");
|
||||||
do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
|
do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
|
||||||
do_query(taos,
|
do_query(taos,
|
||||||
"insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, "
|
"insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, "
|
||||||
"'abc'),(1591060628001,'def'),(1591060628004, 'hij')");
|
"'abc'),(1591060628001,'def'),(1591060628004, 'hij')");
|
||||||
do_query(taos, "use stmt2_testdb_7");
|
do_query(taos, "use stmt2_testdb_7");
|
||||||
|
|
||||||
|
|
||||||
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
|
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
|
||||||
|
|
||||||
|
|
||||||
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||||
ASSERT_NE(stmt, nullptr);
|
ASSERT_NE(stmt, nullptr);
|
||||||
|
|
||||||
|
|
||||||
const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?";
|
const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?";
|
||||||
int code = taos_stmt2_prepare(stmt, sql, 0);
|
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||||
checkError(stmt, code);
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
int t64_len[1] = {sizeof(int64_t)};
|
||||||
int t64_len[1] = {sizeof(int64_t)};
|
int b_len[1] = {3};
|
||||||
int b_len[1] = {3};
|
int x = 2;
|
||||||
int x = 2;
|
int x_len = sizeof(int);
|
||||||
int x_len = sizeof(int);
|
int64_t ts[2] = {1591060627000, 1591060628005};
|
||||||
int64_t ts[2] = {1591060627000, 1591060628005};
|
TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1},
|
||||||
TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1},
|
{TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1},
|
||||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1},
|
{TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}};
|
||||||
{TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}};
|
|
||||||
TAOS_STMT2_BIND* paramv = ¶ms[0];
|
TAOS_STMT2_BIND* paramv = ¶ms[0];
|
||||||
TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv};
|
TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv};
|
||||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||||
checkError(stmt, code);
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
|
||||||
taos_stmt2_exec(stmt, NULL);
|
taos_stmt2_exec(stmt, NULL);
|
||||||
checkError(stmt, code);
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_stmt2_result(stmt);
|
TAOS_RES* pRes = taos_stmt2_result(stmt);
|
||||||
ASSERT_NE(pRes, nullptr);
|
ASSERT_NE(pRes, nullptr);
|
||||||
|
|
||||||
|
|
||||||
int getRecordCounts = 0;
|
int getRecordCounts = 0;
|
||||||
while ((taos_fetch_row(pRes))) {
|
while ((taos_fetch_row(pRes))) {
|
||||||
getRecordCounts++;
|
getRecordCounts++;
|
||||||
}
|
}
|
||||||
ASSERT_EQ(getRecordCounts, 2);
|
ASSERT_EQ(getRecordCounts, 2);
|
||||||
taos_stmt2_close(stmt);
|
taos_stmt2_close(stmt);
|
||||||
|
@ -304,7 +297,6 @@ TEST(stmt2Case, stmt2_test_limit) {
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST(stmt2Case, insert_stb_get_fields_Test) {
|
TEST(stmt2Case, insert_stb_get_fields_Test) {
|
||||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(taos, nullptr);
|
ASSERT_NE(taos, nullptr);
|
||||||
|
@ -1617,4 +1609,233 @@ TEST(stmt2Case, errcode) {
|
||||||
code = taos_stmt_prepare(stmt, sql, 0);
|
code = taos_stmt_prepare(stmt, sql, 0);
|
||||||
checkError(stmt, code);
|
checkError(stmt, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) {
|
||||||
|
bool* finish = (bool*)param;
|
||||||
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||||
|
taosMsleep(500);
|
||||||
|
*finish = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) {
|
||||||
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||||
|
taosMsleep(500);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stmtAsyncBindCb2(void* param, TAOS_RES* pRes, int code) {
|
||||||
|
bool* finish = (bool*)param;
|
||||||
|
taosMsleep(500);
|
||||||
|
*finish = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(stmt2Case, async_order) {
|
||||||
|
int CTB_NUMS = 2;
|
||||||
|
int ROW_NUMS = 2;
|
||||||
|
int CYC_NUMS = 2;
|
||||||
|
|
||||||
|
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL};
|
||||||
|
char* sql = "insert into ? values(?,?)";
|
||||||
|
|
||||||
|
do_query(taos, "drop database if exists stmt2_testdb_15");
|
||||||
|
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_15");
|
||||||
|
do_query(taos, "create stable stmt2_testdb_15.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
|
||||||
|
do_query(taos, "use stmt2_testdb_15");
|
||||||
|
|
||||||
|
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||||
|
ASSERT_NE(stmt, nullptr);
|
||||||
|
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||||
|
checkError(stmt, code);
|
||||||
|
int total_affected = 0;
|
||||||
|
|
||||||
|
// tbname
|
||||||
|
char** tbs = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*));
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
tbs[i] = (char*)taosMemoryMalloc(sizeof(char) * 20);
|
||||||
|
sprintf(tbs[i], "ctb_%d", i);
|
||||||
|
char* tmp = (char*)taosMemoryMalloc(sizeof(char) * 100);
|
||||||
|
sprintf(tmp, "create table stmt2_testdb_15.%s using stmt2_testdb_15.stb tags(0, 'after')", tbs[i]);
|
||||||
|
do_query(taos, tmp);
|
||||||
|
}
|
||||||
|
// params
|
||||||
|
TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*));
|
||||||
|
// col params
|
||||||
|
int64_t** ts = (int64_t**)taosMemoryMalloc(CTB_NUMS * sizeof(int64_t*));
|
||||||
|
char** b = (char**)taosMemoryMalloc(CTB_NUMS * sizeof(char*));
|
||||||
|
int* ts_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int));
|
||||||
|
int* b_len = (int*)taosMemoryMalloc(ROW_NUMS * sizeof(int));
|
||||||
|
for (int i = 0; i < ROW_NUMS; i++) {
|
||||||
|
ts_len[i] = sizeof(int64_t);
|
||||||
|
b_len[i] = 1;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
ts[i] = (int64_t*)taosMemoryMalloc(ROW_NUMS * sizeof(int64_t));
|
||||||
|
b[i] = (char*)taosMemoryMalloc(ROW_NUMS * sizeof(char));
|
||||||
|
for (int j = 0; j < ROW_NUMS; j++) {
|
||||||
|
ts[i][j] = 1591060628000 + 100000 + j;
|
||||||
|
b[i][j] = 'a' + j;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// bind params
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
// create col params
|
||||||
|
paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND));
|
||||||
|
paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS};
|
||||||
|
paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS};
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 1 : bind_a->exec_a->bind_a->exec_a->...
|
||||||
|
{
|
||||||
|
printf("case 1 : bind_a->exec_a->bind_a->exec_a->...\n");
|
||||||
|
for (int r = 0; r < CYC_NUMS; r++) {
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish);
|
||||||
|
|
||||||
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
// exec
|
||||||
|
code = taos_stmt2_exec(stmt, NULL);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 2 : bind_a->bind_a->bind_a->exec_a->...
|
||||||
|
{
|
||||||
|
printf("case 2 : bind_a->bind_a->bind_a->exec_a->...\n");
|
||||||
|
for (int r = 0; r < CYC_NUMS; r++) {
|
||||||
|
// bind params
|
||||||
|
TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)taosMemoryMalloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*));
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
// create col params
|
||||||
|
paramv[i] = (TAOS_STMT2_BIND*)taosMemoryMalloc(2 * sizeof(TAOS_STMT2_BIND));
|
||||||
|
paramv[i][0] = {TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS};
|
||||||
|
paramv[i][1] = {TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS};
|
||||||
|
}
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish);
|
||||||
|
while (!finish) {
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
// exec
|
||||||
|
code = taos_stmt2_exec(stmt, NULL);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 3 : bind->exec_a->bind->exec_a->...
|
||||||
|
{
|
||||||
|
printf("case 3 : bind->exec_a->bind->exec_a->...\n");
|
||||||
|
for (int r = 0; r < CYC_NUMS; r++) {
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||||
|
|
||||||
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
// exec
|
||||||
|
code = taos_stmt2_exec(stmt, NULL);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 4 : bind_a->close
|
||||||
|
{
|
||||||
|
printf("case 4 : bind_a->close\n");
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish);
|
||||||
|
checkError(stmt, code);
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 5 : bind_a->exec_a->close
|
||||||
|
{
|
||||||
|
printf("case 5 : bind_a->exec_a->close\n");
|
||||||
|
// init
|
||||||
|
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||||
|
ASSERT_NE(stmt, nullptr);
|
||||||
|
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||||
|
checkError(stmt, code);
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish);
|
||||||
|
checkError(stmt, code);
|
||||||
|
// exec
|
||||||
|
code = taos_stmt2_exec(stmt, NULL);
|
||||||
|
checkError(stmt, code);
|
||||||
|
// close
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
option = {0, false, false, NULL, NULL};
|
||||||
|
stmt = taos_stmt2_init(taos, &option);
|
||||||
|
ASSERT_NE(stmt, nullptr);
|
||||||
|
code = taos_stmt2_prepare(stmt, sql, 0);
|
||||||
|
checkError(stmt, code);
|
||||||
|
|
||||||
|
// case 6 : bind_a->exec->bind_a->exec->...
|
||||||
|
{
|
||||||
|
printf("case 6 : bind_a->exec->bind_a->exec->...\n");
|
||||||
|
// init
|
||||||
|
|
||||||
|
checkError(stmt, code);
|
||||||
|
for (int r = 0; r < CYC_NUMS; r++) {
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, (void*)&finish);
|
||||||
|
checkError(stmt, code);
|
||||||
|
// exec
|
||||||
|
code = taos_stmt2_exec(stmt, NULL);
|
||||||
|
checkError(stmt, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 7 (error:no wait error) : bind_a->bind_a
|
||||||
|
{
|
||||||
|
printf("case 7 (error:no wait error) : bind_a->bind_a\n");
|
||||||
|
// bind
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||||
|
bool finish = false;
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish);
|
||||||
|
checkError(stmt, code);
|
||||||
|
taosMsleep(200);
|
||||||
|
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb2, (void*)&finish);
|
||||||
|
ASSERT_EQ(code, TSDB_CODE_TSC_STMT_API_ERROR);
|
||||||
|
while (!finish) {
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// close
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
|
||||||
|
// free memory
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
taosMemoryFree(paramv[i]);
|
||||||
|
taosMemoryFree(ts[i]);
|
||||||
|
taosMemoryFree(b[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(ts);
|
||||||
|
taosMemoryFree(b);
|
||||||
|
taosMemoryFree(ts_len);
|
||||||
|
taosMemoryFree(b_len);
|
||||||
|
taosMemoryFree(paramv);
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
taosMemoryFree(tbs[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(tbs);
|
||||||
|
}
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -202,6 +202,18 @@ int32_t taosAsyncRecover() {
|
||||||
return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
|
return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
|
||||||
|
SSchedMsg* pSchedMsg;
|
||||||
|
int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
|
||||||
|
if (rc) return rc;
|
||||||
|
pSchedMsg->fp = NULL;
|
||||||
|
pSchedMsg->ahandle = bindFn;
|
||||||
|
pSchedMsg->thandle = bindParam;
|
||||||
|
// pSchedMsg->msg = code;
|
||||||
|
|
||||||
|
return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||||
if (NULL == pMsgBody) {
|
if (NULL == pMsgBody) {
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue