diff --git a/source/client/inc/clientStmt2.h b/source/client/inc/clientStmt2.h index e55d9a048f..feec9ec337 100644 --- a/source/client/inc/clientStmt2.h +++ b/source/client/inc/clientStmt2.h @@ -134,6 +134,12 @@ uint64_t qRemainNum; } SStmtQueue; */ +typedef struct AsyncBindParam { + TdThreadMutex mutex; + TdThreadCond waitCond; + uint8_t asyncBindNum; +} AsyncBindParam; + typedef struct { STscObj *taos; SCatalog *pCatalog; @@ -150,14 +156,13 @@ typedef struct { SStmtExecInfo exec; SStmtBindInfo bInfo; - char *db; - int64_t reqid; - int32_t errCode; - tsem_t asyncExecSem; - bool execSemWaited; - tsem_t asyncBindSem; - bool bindSemWaited; - SStmtStatInfo stat; + char *db; + int64_t reqid; + int32_t errCode; + tsem_t asyncExecSem; + bool execSemWaited; + AsyncBindParam asyncBindParam; + SStmtStatInfo stat; } STscStmt2; /* extern char *gStmtStatusStr[]; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c472be8bc8..ba67ff7f76 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2166,6 +2166,11 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col } STscStmt2 *pStmt = (STscStmt2 *)stmt; + if( atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum)>1) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { if (tsem_wait(&pStmt->asyncExecSem) != 0) { tscError("wait asyncExecSem failed"); @@ -2173,13 +2178,6 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col pStmt->execSemWaited = true; } - if (!pStmt->bindSemWaited) { - if (tsem_wait(&pStmt->asyncBindSem) != 0) { - tscError("wait asyncBindSem failed"); - } - pStmt->bindSemWaited = true; - } - SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); if (NULL == hashTbnames) { tscError("stmt2 bind failed: %s", tstrerror(terrno)); @@ -2252,7 +2250,20 @@ out: int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param) { - return stmt2AsyncBind(stmt, bindv,col_idx, fp, param); + if (stmt == NULL || bindv == NULL || fp == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + STscStmt2 *pStmt = (STscStmt2 *)stmt; + (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param); + if (code != TSDB_CODE_SUCCESS) { + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + // terrno = TAOS_SYSTEM_ERROR(errno); + } + + return code; } int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 5c8e2655e6..597ec95f23 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -752,6 +752,14 @@ static int32_t stmtInitQueue(STscStmt2* pStmt) { return TSDB_CODE_SUCCESS; } +static int32_t stmtIniAsyncBind(STscStmt2* pStmt) { + (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL); + (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL); + pStmt->asyncBindParam.asyncBindNum = 0; + + return TSDB_CODE_SUCCESS; +} + static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { pTblBuf->buffUnit = sizeof(SStmtQNode); pTblBuf->buffSize = pTblBuf->buffUnit * 1000; @@ -812,13 +820,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pStmt->sql.siInfo.pTableHash) { - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); if (NULL == pStmt->sql.siInfo.pTableCols) { terrno = terrno; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } @@ -831,7 +839,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { } if (TSDB_CODE_SUCCESS != code) { terrno = code; - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } @@ -840,10 +848,17 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) { if (pStmt->options.asyncExecFn) { if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - (void)stmtClose(pStmt); + (void)stmtClose2(pStmt); return NULL; } } + code = stmtIniAsyncBind(pStmt); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + (void)stmtClose2(pStmt); + return NULL; + } + pStmt->execSemWaited = false; STMT_LOG_SEQ(STMT_INIT); @@ -1675,6 +1690,12 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) { return pStmt->errCode; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); + if (pStmt->sql.stbInterlaceMode) { STMT_ERR_RET(stmtAddBatch2(pStmt)); } @@ -1775,9 +1796,17 @@ int stmtClose2(TAOS_STMT2* stmt) { pStmt->bindThreadInUse = false; } + taosThreadMutexLock(&pStmt->asyncBindParam.mutex); + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) { + (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex); + } + taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex); (void)taosThreadCondDestroy(&pStmt->queue.waitCond); (void)taosThreadMutexDestroy(&pStmt->queue.mutex); + (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond); + (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex); + if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) { if (tsem_wait(&pStmt->asyncExecSem) != 0) { tscError("failed to wait asyncExecSem"); @@ -1937,17 +1966,32 @@ typedef struct { } ThreadArgs; static void* stmtAsyncBindThreadFunc(void* args) { + setThreadName("stmtAsyncBind"); + + qInfo("async stmt bind thread started"); + ThreadArgs* targs = (ThreadArgs*)args; + STscStmt2* pStmt = (STscStmt2*)targs->stmt; int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx); targs->fp(targs->param, NULL, code); + (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex)); + (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1); + (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond)); + (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex)); taosMemoryFree(args); + qInfo("async stmt bind thread stopped"); + return NULL; } int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) { STscStmt2* pStmt = (STscStmt2*)stmt; + if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) { + tscError("async bind param is still working, please try again later"); + return TSDB_CODE_TSC_STMT_API_ERROR; + } TdThreadAttr thAttr; if (taosThreadAttrInit(&thAttr) != 0) { diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 95a7ecd8f7..6062189602 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -255,48 +255,41 @@ TEST(stmt2Case, stmt2_test_limit) { do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7"); do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); do_query(taos, - "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " - "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); + "insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, " + "'abc'),(1591060628001,'def'),(1591060628004, 'hij')"); do_query(taos, "use stmt2_testdb_7"); - TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; - TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); ASSERT_NE(stmt, nullptr); - const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?"; - int code = taos_stmt2_prepare(stmt, sql, 0); + int code = taos_stmt2_prepare(stmt, sql, 0); checkError(stmt, code); - - int t64_len[1] = {sizeof(int64_t)}; - int b_len[1] = {3}; - int x = 2; - int x_len = sizeof(int); - int64_t ts[2] = {1591060627000, 1591060628005}; - TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, - {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; + int t64_len[1] = {sizeof(int64_t)}; + int b_len[1] = {3}; + int x = 2; + int x_len = sizeof(int); + int64_t ts[2] = {1591060627000, 1591060628005}; + TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1}, + {TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}}; TAOS_STMT2_BIND* paramv = ¶ms[0]; TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv}; code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); - taos_stmt2_exec(stmt, NULL); checkError(stmt, code); - TAOS_RES* pRes = taos_stmt2_result(stmt); ASSERT_NE(pRes, nullptr); - int getRecordCounts = 0; while ((taos_fetch_row(pRes))) { - getRecordCounts++; + getRecordCounts++; } ASSERT_EQ(getRecordCounts, 2); taos_stmt2_close(stmt); @@ -304,7 +297,6 @@ TEST(stmt2Case, stmt2_test_limit) { taos_close(taos); } - TEST(stmt2Case, insert_stb_get_fields_Test) { TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1619,20 +1611,24 @@ TEST(stmt2Case, errcode) { } void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) { - if (code != TSDB_CODE_SUCCESS) { - taos_errstr(pRes); - } ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); + return; +} + +void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) { + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + taosMsleep(500); return; } TEST(stmt2Case, async_order) { int CTB_NUMS = 3; int ROW_NUMS = 3; - int CYC_NUMS = 3; + int CYC_NUMS = 1; TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0); - TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb, NULL}; + TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL}; char* sql = "insert into ? values(?,?)"; do_query(taos, "drop database if exists stmt2_testdb_15"); @@ -1687,15 +1683,14 @@ TEST(stmt2Case, async_order) { } // bind TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv}; - // code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); - code = taos_stmt2_bind_param(stmt, &bindv, -1); + code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL); + // code = taos_stmt2_bind_param(stmt, &bindv, -1); checkError(stmt, code); // exec int affected = 0; code = taos_stmt2_exec(stmt, &affected); - total_affected += affected; checkError(stmt, code); for (int i = 0; i < CTB_NUMS; i++) { @@ -1709,7 +1704,7 @@ TEST(stmt2Case, async_order) { taosMemoryFree(b_len); taosMemoryFree(paramv); } - ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); + // ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS); // case 2 : bind_a->bind_a->bind_a->exec_a->... // case 3 : bind->exec_a->bind->exec_a->...