[td-225] add check after killing query.
This commit is contained in:
parent
24aa1a357d
commit
562e62e933
|
@ -270,6 +270,9 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo
|
||||||
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
||||||
int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
|
int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
|
||||||
|
|
||||||
|
bool tscSetSqlOwner(SSqlObj* pSql);
|
||||||
|
void tscClearSqlOwner(SSqlObj* pSql);
|
||||||
|
|
||||||
void* malloc_throw(size_t size);
|
void* malloc_throw(size_t size);
|
||||||
void* calloc_throw(size_t nmemb, size_t size);
|
void* calloc_throw(size_t nmemb, size_t size);
|
||||||
char* strdup_throw(const char* str);
|
char* strdup_throw(const char* str);
|
||||||
|
|
|
@ -302,6 +302,7 @@ typedef struct STscObj {
|
||||||
|
|
||||||
typedef struct SSqlObj {
|
typedef struct SSqlObj {
|
||||||
void *signature;
|
void *signature;
|
||||||
|
pthread_t owner; // owner of sql object, by which it is executed
|
||||||
STscObj *pTscObj;
|
STscObj *pTscObj;
|
||||||
void *pRpcCtx;
|
void *pRpcCtx;
|
||||||
void (*fp)();
|
void (*fp)();
|
||||||
|
|
|
@ -233,35 +233,6 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
||||||
sem_post(&pSql->rspSem);
|
sem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
|
||||||
STscObj *pObj = (STscObj *)taos;
|
|
||||||
if (pObj == NULL || pObj->signature != pObj) {
|
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sqlLen = strlen(sqlstr);
|
|
||||||
if (sqlLen > tsMaxSQLStringLen) {
|
|
||||||
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
|
|
||||||
terrno = TSDB_CODE_TSC_INVALID_SQL;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosNotePrintTsc(sqlstr);
|
|
||||||
|
|
||||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
|
||||||
if (pSql == NULL) {
|
|
||||||
tscError("failed to malloc sqlObj");
|
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
|
||||||
|
|
||||||
// wait for the callback function to post the semaphore
|
|
||||||
tsem_wait(&pSql->rspSem);
|
|
||||||
return pSql;
|
|
||||||
}
|
|
||||||
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
if (pObj == NULL || pObj->signature != pObj) {
|
if (pObj == NULL || pObj->signature != pObj) {
|
||||||
|
@ -274,7 +245,9 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
||||||
terrno = TSDB_CODE_TSC_INVALID_SQL;
|
terrno = TSDB_CODE_TSC_INVALID_SQL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosNotePrintTsc(sqlstr);
|
||||||
|
|
||||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||||
if (pSql == NULL) {
|
if (pSql == NULL) {
|
||||||
tscError("failed to malloc sqlObj");
|
tscError("failed to malloc sqlObj");
|
||||||
|
@ -287,6 +260,11 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
||||||
tsem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
return pSql;
|
return pSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
||||||
|
return taos_query_c(taos, sqlstr, strlen(sqlstr));
|
||||||
|
}
|
||||||
|
|
||||||
int taos_result_precision(TAOS_RES *res) {
|
int taos_result_precision(TAOS_RES *res) {
|
||||||
SSqlObj *pSql = (SSqlObj *)res;
|
SSqlObj *pSql = (SSqlObj *)res;
|
||||||
if (pSql == NULL || pSql->signature != pSql) return 0;
|
if (pSql == NULL || pSql->signature != pSql) return 0;
|
||||||
|
@ -422,7 +400,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
pCmd->command == TSDB_SQL_INSERT) {
|
pCmd->command == TSDB_SQL_INSERT) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the sql object owner
|
||||||
|
tscSetSqlOwner(pSql);
|
||||||
|
|
||||||
// current data set are exhausted, fetch more data from node
|
// current data set are exhausted, fetch more data from node
|
||||||
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
|
||||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
|
@ -441,7 +422,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
return doSetResultRowData(pSql, true);
|
void* data = doSetResultRowData(pSql, true);
|
||||||
|
|
||||||
|
tscClearSqlOwner(pSql);
|
||||||
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
|
@ -509,7 +493,7 @@ int taos_select_db(TAOS *taos, const char *db) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// send free message to vnode to free qhandle and corresponding resources in vnode
|
// send free message to vnode to free qhandle and corresponding resources in vnode
|
||||||
static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
|
static bool tscKillQueryInVnode(SSqlObj* pSql) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SSqlRes* pRes = &pSql->res;
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
|
@ -557,16 +541,14 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||||
if (!tscFreeQhandleInVnode(pSql)) {
|
if (!tscKillQueryInVnode(pSql)) {
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
tscDebug("%p sqlObj is freed by app", pSql);
|
tscDebug("%p sqlObj is freed by app", pSql);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo should not be used in async query
|
|
||||||
int taos_errno(TAOS_RES *tres) {
|
int taos_errno(TAOS_RES *tres) {
|
||||||
SSqlObj *pSql = (SSqlObj *) tres;
|
SSqlObj *pSql = (SSqlObj *) tres;
|
||||||
|
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2223,3 +2223,21 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool tscSetSqlOwner(SSqlObj* pSql) {
|
||||||
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
|
// set the sql object owner
|
||||||
|
uint64_t threadId = taosGetPthreadId();
|
||||||
|
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
|
||||||
|
pRes->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tscClearSqlOwner(SSqlObj* pSql) {
|
||||||
|
assert(pSql->owner != 0);
|
||||||
|
atomic_store_64(&pSql->owner, 0);
|
||||||
|
}
|
Loading…
Reference in New Issue