fix some thread sync problem
This commit is contained in:
parent
ee31bd9a9c
commit
b55980d71d
|
@ -134,6 +134,12 @@ uint64_t qRemainNum;
|
|||
} SStmtQueue;
|
||||
*/
|
||||
|
||||
typedef struct AsyncBindParam {
|
||||
TdThreadMutex mutex;
|
||||
TdThreadCond waitCond;
|
||||
uint8_t asyncBindNum;
|
||||
} AsyncBindParam;
|
||||
|
||||
typedef struct {
|
||||
STscObj *taos;
|
||||
SCatalog *pCatalog;
|
||||
|
@ -150,14 +156,13 @@ typedef struct {
|
|||
SStmtExecInfo exec;
|
||||
SStmtBindInfo bInfo;
|
||||
|
||||
char *db;
|
||||
int64_t reqid;
|
||||
int32_t errCode;
|
||||
tsem_t asyncExecSem;
|
||||
bool execSemWaited;
|
||||
tsem_t asyncBindSem;
|
||||
bool bindSemWaited;
|
||||
SStmtStatInfo stat;
|
||||
char *db;
|
||||
int64_t reqid;
|
||||
int32_t errCode;
|
||||
tsem_t asyncExecSem;
|
||||
bool execSemWaited;
|
||||
AsyncBindParam asyncBindParam;
|
||||
SStmtStatInfo stat;
|
||||
} STscStmt2;
|
||||
/*
|
||||
extern char *gStmtStatusStr[];
|
||||
|
|
|
@ -2166,6 +2166,11 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
|
|||
}
|
||||
|
||||
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
||||
if( atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum)>1) {
|
||||
tscError("async bind param is still working, please try again later");
|
||||
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||
}
|
||||
|
||||
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
|
||||
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
|
||||
tscError("wait asyncExecSem failed");
|
||||
|
@ -2173,13 +2178,6 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
|
|||
pStmt->execSemWaited = true;
|
||||
}
|
||||
|
||||
if (!pStmt->bindSemWaited) {
|
||||
if (tsem_wait(&pStmt->asyncBindSem) != 0) {
|
||||
tscError("wait asyncBindSem failed");
|
||||
}
|
||||
pStmt->bindSemWaited = true;
|
||||
}
|
||||
|
||||
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
|
||||
if (NULL == hashTbnames) {
|
||||
tscError("stmt2 bind failed: %s", tstrerror(terrno));
|
||||
|
@ -2252,7 +2250,20 @@ out:
|
|||
|
||||
int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
|
||||
void *param) {
|
||||
return stmt2AsyncBind(stmt, bindv,col_idx, fp, param);
|
||||
if (stmt == NULL || bindv == NULL || fp == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
||||
(void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||
int code = stmt2AsyncBind(stmt, bindv, col_idx, fp, param);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
|
||||
|
|
|
@ -752,6 +752,14 @@ static int32_t stmtInitQueue(STscStmt2* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
|
||||
(void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
|
||||
(void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
|
||||
pStmt->asyncBindParam.asyncBindNum = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
|
||||
pTblBuf->buffUnit = sizeof(SStmtQNode);
|
||||
pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
|
||||
|
@ -812,13 +820,13 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
|||
pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
||||
if (NULL == pStmt->sql.siInfo.pTableHash) {
|
||||
(void)stmtClose(pStmt);
|
||||
(void)stmtClose2(pStmt);
|
||||
return NULL;
|
||||
}
|
||||
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
|
||||
if (NULL == pStmt->sql.siInfo.pTableCols) {
|
||||
terrno = terrno;
|
||||
(void)stmtClose(pStmt);
|
||||
(void)stmtClose2(pStmt);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -831,7 +839,7 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
|||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
terrno = code;
|
||||
(void)stmtClose(pStmt);
|
||||
(void)stmtClose2(pStmt);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -840,10 +848,17 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
|||
if (pStmt->options.asyncExecFn) {
|
||||
if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
(void)stmtClose(pStmt);
|
||||
(void)stmtClose2(pStmt);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
code = stmtIniAsyncBind(pStmt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
terrno = code;
|
||||
(void)stmtClose2(pStmt);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pStmt->execSemWaited = false;
|
||||
|
||||
STMT_LOG_SEQ(STMT_INIT);
|
||||
|
@ -1675,6 +1690,12 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
|||
return pStmt->errCode;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||
}
|
||||
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||
|
||||
if (pStmt->sql.stbInterlaceMode) {
|
||||
STMT_ERR_RET(stmtAddBatch2(pStmt));
|
||||
}
|
||||
|
@ -1775,9 +1796,17 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
|||
pStmt->bindThreadInUse = false;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
|
||||
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
|
||||
(void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
|
||||
}
|
||||
taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
|
||||
(void)taosThreadCondDestroy(&pStmt->queue.waitCond);
|
||||
(void)taosThreadMutexDestroy(&pStmt->queue.mutex);
|
||||
|
||||
(void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
|
||||
(void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
|
||||
|
||||
if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
|
||||
if (tsem_wait(&pStmt->asyncExecSem) != 0) {
|
||||
tscError("failed to wait asyncExecSem");
|
||||
|
@ -1937,17 +1966,32 @@ typedef struct {
|
|||
} ThreadArgs;
|
||||
|
||||
static void* stmtAsyncBindThreadFunc(void* args) {
|
||||
setThreadName("stmtAsyncBind");
|
||||
|
||||
qInfo("async stmt bind thread started");
|
||||
|
||||
ThreadArgs* targs = (ThreadArgs*)args;
|
||||
STscStmt2* pStmt = (STscStmt2*)targs->stmt;
|
||||
|
||||
int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
|
||||
targs->fp(targs->param, NULL, code);
|
||||
(void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
|
||||
(void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
|
||||
(void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
|
||||
(void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
|
||||
taosMemoryFree(args);
|
||||
|
||||
qInfo("async stmt bind thread stopped");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int stmt2AsyncBind(TAOS_STMT2* stmt, TAOS_STMT2_BINDV* bindv, int32_t col_idx, __taos_async_fn_t fp, void* param) {
|
||||
STscStmt2* pStmt = (STscStmt2*)stmt;
|
||||
if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 1) {
|
||||
tscError("async bind param is still working, please try again later");
|
||||
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||
}
|
||||
|
||||
TdThreadAttr thAttr;
|
||||
if (taosThreadAttrInit(&thAttr) != 0) {
|
||||
|
|
|
@ -255,48 +255,41 @@ TEST(stmt2Case, stmt2_test_limit) {
|
|||
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_7");
|
||||
do_query(taos, "create stable stmt2_testdb_7.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
|
||||
do_query(taos,
|
||||
"insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, "
|
||||
"'abc'),(1591060628001,'def'),(1591060628004, 'hij')");
|
||||
"insert into stmt2_testdb_7.tb2 using stmt2_testdb_7.stb tags(2,'xyz') values(1591060628000, "
|
||||
"'abc'),(1591060628001,'def'),(1591060628004, 'hij')");
|
||||
do_query(taos, "use stmt2_testdb_7");
|
||||
|
||||
|
||||
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
|
||||
|
||||
|
||||
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
|
||||
ASSERT_NE(stmt, nullptr);
|
||||
|
||||
|
||||
const char* sql = "select * from stmt2_testdb_7.tb2 where ts > ? and ts < ? limit ?";
|
||||
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||
checkError(stmt, code);
|
||||
|
||||
|
||||
int t64_len[1] = {sizeof(int64_t)};
|
||||
int b_len[1] = {3};
|
||||
int x = 2;
|
||||
int x_len = sizeof(int);
|
||||
int64_t ts[2] = {1591060627000, 1591060628005};
|
||||
TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1},
|
||||
{TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}};
|
||||
int t64_len[1] = {sizeof(int64_t)};
|
||||
int b_len[1] = {3};
|
||||
int x = 2;
|
||||
int x_len = sizeof(int);
|
||||
int64_t ts[2] = {1591060627000, 1591060628005};
|
||||
TAOS_STMT2_BIND params[3] = {{TSDB_DATA_TYPE_TIMESTAMP, &ts[0], t64_len, NULL, 1},
|
||||
{TSDB_DATA_TYPE_TIMESTAMP, &ts[1], t64_len, NULL, 1},
|
||||
{TSDB_DATA_TYPE_INT, &x, &x_len, NULL, 1}};
|
||||
TAOS_STMT2_BIND* paramv = ¶ms[0];
|
||||
TAOS_STMT2_BINDV bindv = {1, NULL, NULL, ¶mv};
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
checkError(stmt, code);
|
||||
|
||||
|
||||
taos_stmt2_exec(stmt, NULL);
|
||||
checkError(stmt, code);
|
||||
|
||||
|
||||
TAOS_RES* pRes = taos_stmt2_result(stmt);
|
||||
ASSERT_NE(pRes, nullptr);
|
||||
|
||||
|
||||
int getRecordCounts = 0;
|
||||
while ((taos_fetch_row(pRes))) {
|
||||
getRecordCounts++;
|
||||
getRecordCounts++;
|
||||
}
|
||||
ASSERT_EQ(getRecordCounts, 2);
|
||||
taos_stmt2_close(stmt);
|
||||
|
@ -304,7 +297,6 @@ TEST(stmt2Case, stmt2_test_limit) {
|
|||
taos_close(taos);
|
||||
}
|
||||
|
||||
|
||||
TEST(stmt2Case, insert_stb_get_fields_Test) {
|
||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(taos, nullptr);
|
||||
|
@ -1619,20 +1611,24 @@ TEST(stmt2Case, errcode) {
|
|||
}
|
||||
|
||||
void stmtAsyncBindCb(void* param, TAOS_RES* pRes, int code) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taos_errstr(pRes);
|
||||
}
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
taosMsleep(500);
|
||||
return;
|
||||
}
|
||||
|
||||
void stmtAsyncQueryCb2(void* param, TAOS_RES* pRes, int code) {
|
||||
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
||||
taosMsleep(500);
|
||||
return;
|
||||
}
|
||||
|
||||
TEST(stmt2Case, async_order) {
|
||||
int CTB_NUMS = 3;
|
||||
int ROW_NUMS = 3;
|
||||
int CYC_NUMS = 3;
|
||||
int CYC_NUMS = 1;
|
||||
|
||||
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb, NULL};
|
||||
TAOS_STMT2_OPTION option = {0, true, true, stmtAsyncQueryCb2, NULL};
|
||||
char* sql = "insert into ? values(?,?)";
|
||||
|
||||
do_query(taos, "drop database if exists stmt2_testdb_15");
|
||||
|
@ -1687,15 +1683,14 @@ TEST(stmt2Case, async_order) {
|
|||
}
|
||||
// bind
|
||||
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, NULL, paramv};
|
||||
// code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL);
|
||||
code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
code = taos_stmt2_bind_param_a(stmt, &bindv, -1, stmtAsyncBindCb, NULL);
|
||||
// code = taos_stmt2_bind_param(stmt, &bindv, -1);
|
||||
|
||||
checkError(stmt, code);
|
||||
|
||||
// exec
|
||||
int affected = 0;
|
||||
code = taos_stmt2_exec(stmt, &affected);
|
||||
total_affected += affected;
|
||||
checkError(stmt, code);
|
||||
|
||||
for (int i = 0; i < CTB_NUMS; i++) {
|
||||
|
@ -1709,7 +1704,7 @@ TEST(stmt2Case, async_order) {
|
|||
taosMemoryFree(b_len);
|
||||
taosMemoryFree(paramv);
|
||||
}
|
||||
ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS);
|
||||
// ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS);
|
||||
|
||||
// case 2 : bind_a->bind_a->bind_a->exec_a->...
|
||||
// case 3 : bind->exec_a->bind->exec_a->...
|
||||
|
|
Loading…
Reference in New Issue