[td-1637]
This commit is contained in:
parent
d838bacdbe
commit
b6786bc661
|
@ -80,6 +80,8 @@ enum {
|
|||
DATA_FROM_DATA_FILE = 2,
|
||||
};
|
||||
|
||||
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags;
|
||||
uint8_t precision;
|
||||
|
@ -226,7 +228,7 @@ typedef struct STableDataBlocks {
|
|||
typedef struct SQueryInfo {
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert type
|
||||
// TODO refactor
|
||||
|
||||
STimeWindow window; // query time window
|
||||
SInterval interval;
|
||||
|
||||
|
@ -440,19 +442,20 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql);
|
|||
* @param pObj
|
||||
*/
|
||||
void tscFreeSqlObj(SSqlObj *pSql);
|
||||
|
||||
void tscFreeSqlObjInCache(void *pSql);
|
||||
void tscFreeRegisteredSqlObj(void *pSql);
|
||||
|
||||
void tscCloseTscObj(STscObj *pObj);
|
||||
|
||||
// todo move to taos? or create a new file: taos_internal.h
|
||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||
void *param, void **taos);
|
||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
|
||||
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res);
|
||||
|
||||
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
|
||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
|
||||
|
||||
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
|
||||
|
||||
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
|
||||
void tscKillSTableQuery(SSqlObj *pSql);
|
||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||
bool tscIsUpdateQuery(SSqlObj* pSql);
|
||||
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
|
||||
|
@ -517,8 +520,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
|
|||
|
||||
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
||||
|
||||
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
int32_t tscCompareTidTags(const void* p1, const void* p2);
|
||||
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
|
||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
pSql->signature = pSql;
|
||||
|
|
|
@ -151,10 +151,12 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
|
|||
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
if (pSql == NULL) return;
|
||||
|
||||
tscDebug("%p query is killed, queryId:%d", pSql, killId);
|
||||
taos_stop_query(pSql);
|
||||
if (pSql == NULL) {
|
||||
tscError("failed to kill query, id:%d, it may have completed/terminated", killId);
|
||||
} else {
|
||||
tscDebug("%p query is killed, queryId:%d", pSql, killId);
|
||||
taos_stop_query(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
void tscAddIntoStreamList(SSqlStream *pStream) {
|
||||
|
|
|
@ -467,45 +467,6 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
return doProcessSql(pSql);
|
||||
}
|
||||
|
||||
void tscKillSTableQuery(SSqlObj *pSql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// set the master sqlObj flag to cancel query
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
// NOTE: pSub may have been released already here
|
||||
SSqlObj *pSub = pSql->pSubs[i];
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSqlObj* pSubObj = (SSqlObj*) (*p);
|
||||
assert(pSubObj->self == (SSqlObj**) p);
|
||||
|
||||
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
if (pSubObj->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSubObj->pRpcCtx);
|
||||
pSubObj->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
// tscQueueAsyncRes(pSubObj); // async res? not other functions?
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
}
|
||||
|
||||
tscDebug("%p super table query cancelled", pSql);
|
||||
}
|
||||
|
||||
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
|
||||
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
||||
|
|
|
@ -307,7 +307,7 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
|||
tsem_post(&pSql->rspSem);
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
||||
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES** res) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
|
@ -332,12 +332,20 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
|||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
||||
|
||||
if (res != NULL) {
|
||||
*res = pSql;
|
||||
}
|
||||
|
||||
tsem_wait(&pSql->rspSem);
|
||||
return pSql;
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
||||
return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr));
|
||||
return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr), NULL);
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res) {
|
||||
return taos_query_c(taos, sqlstr, (uint32_t) strlen(sqlstr), res);
|
||||
}
|
||||
|
||||
int taos_result_precision(TAOS_RES *res) {
|
||||
|
@ -689,6 +697,45 @@ int* taos_fetch_lengths(TAOS_RES *res) {
|
|||
|
||||
char *taos_get_client_info() { return version; }
|
||||
|
||||
static void tscKillSTableQuery(SSqlObj *pSql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// set the master sqlObj flag to cancel query
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
// NOTE: pSub may have been released already here
|
||||
SSqlObj *pSub = pSql->pSubs[i];
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSqlObj* pSubObj = (SSqlObj*) (*p);
|
||||
assert(pSubObj->self == (SSqlObj**) p);
|
||||
|
||||
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
if (pSubObj->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSubObj->pRpcCtx);
|
||||
pSubObj->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
tscQueueAsyncRes(pSubObj);
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
}
|
||||
|
||||
tscDebug("%p super table query cancelled", pSql);
|
||||
}
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
SSqlObj *pSql = (SSqlObj *)res;
|
||||
if (pSql == NULL || pSql->signature != pSql) {
|
||||
|
@ -698,19 +745,20 @@ void taos_stop_query(TAOS_RES *res) {
|
|||
tscDebug("%p start to cancel query", res);
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
// TODO there are multi-thread problem.
|
||||
// It may have been released by the other thread already.
|
||||
// The ref count may fix this problem.
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
// set the error code for master pSqlObj firstly
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
assert(pSql->pRpcCtx == NULL);
|
||||
tscKillSTableQuery(pSql);
|
||||
} else {
|
||||
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
/*
|
||||
* There is multi-thread problem here, since pSql->pRpcCtx may have been
|
||||
* reset and freed in the processMsgFromServer function, and causes the invalid
|
||||
* write problem for rpcCancelRequest.
|
||||
*/
|
||||
if (pSql->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSql->pRpcCtx);
|
||||
pSql->pRpcCtx = NULL;
|
||||
|
|
|
@ -1563,6 +1563,11 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
|
|||
}
|
||||
|
||||
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
|
||||
// it has been freed already
|
||||
if (pSql->param != trsupport || pSql->param == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlObj *pParentSql = trsupport->pParentSql;
|
||||
int32_t subqueryIndex = trsupport->subqueryIndex;
|
||||
|
||||
|
@ -1709,14 +1714,21 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
}
|
||||
|
||||
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
|
||||
SSqlObj *pSql = (SSqlObj *)tres;
|
||||
assert(pSql != NULL);
|
||||
|
||||
// this query has been freed already
|
||||
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
|
||||
if (pSql->param == NULL || param == NULL) {
|
||||
tscDebug("%p already freed in dnodecallback", pSql);
|
||||
assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
return;
|
||||
}
|
||||
|
||||
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
|
||||
int32_t idx = trsupport->subqueryIndex;
|
||||
SSqlObj * pParentSql = trsupport->pParentSql;
|
||||
|
||||
SSqlObj *pSql = (SSqlObj *)tres;
|
||||
assert(pSql != NULL && trsupport == pSql->param);
|
||||
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "taos.h"
|
||||
#include "taosdef.h"
|
||||
#include "stdbool.h"
|
||||
#include "tsclient.h"
|
||||
|
||||
#define MAX_USERNAME_SIZE 64
|
||||
#define MAX_DBNAME_SIZE 64
|
||||
|
|
|
@ -294,9 +294,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
|
|||
|
||||
st = taosGetTimestampUs();
|
||||
|
||||
TAOS_RES* pSql = taos_query(con, command);
|
||||
atomic_store_ptr(&result, pSql); // set the global TAOS_RES pointer
|
||||
|
||||
TAOS_RES* pSql = taos_query_h(con, command, &result);
|
||||
if (taos_errno(pSql)) {
|
||||
taos_error(pSql);
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue