commit
e9c4a8a8b5
|
@ -80,6 +80,8 @@ enum {
|
|||
DATA_FROM_DATA_FILE = 2,
|
||||
};
|
||||
|
||||
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags;
|
||||
uint8_t precision;
|
||||
|
@ -226,7 +228,7 @@ typedef struct STableDataBlocks {
|
|||
typedef struct SQueryInfo {
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert type
|
||||
// TODO refactor
|
||||
|
||||
STimeWindow window; // query time window
|
||||
SInterval interval;
|
||||
|
||||
|
@ -440,19 +442,20 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql);
|
|||
* @param pObj
|
||||
*/
|
||||
void tscFreeSqlObj(SSqlObj *pSql);
|
||||
|
||||
void tscFreeSqlObjInCache(void *pSql);
|
||||
void tscFreeRegisteredSqlObj(void *pSql);
|
||||
|
||||
void tscCloseTscObj(STscObj *pObj);
|
||||
|
||||
// todo move to taos? or create a new file: taos_internal.h
|
||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||
void *param, void **taos);
|
||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
|
||||
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res);
|
||||
|
||||
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
|
||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
|
||||
|
||||
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
|
||||
|
||||
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
|
||||
void tscKillSTableQuery(SSqlObj *pSql);
|
||||
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
|
||||
bool tscIsUpdateQuery(SSqlObj* pSql);
|
||||
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
|
||||
|
@ -517,8 +520,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
|
|||
|
||||
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
||||
|
||||
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
int32_t tscCompareTidTags(const void* p1, const void* p2);
|
||||
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
|
||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
pSql->signature = pSql;
|
||||
|
|
|
@ -742,6 +742,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
|
|||
int32_t numOfVnodes) {
|
||||
destroyColumnModel(pFinalModel);
|
||||
tOrderDescDestroy(pDesc);
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
|
||||
}
|
||||
|
|
|
@ -151,10 +151,12 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
|
|||
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
if (pSql == NULL) return;
|
||||
|
||||
tscDebug("%p query is killed, queryId:%d", pSql, killId);
|
||||
taos_stop_query(pSql);
|
||||
if (pSql == NULL) {
|
||||
tscError("failed to kill query, id:%d, it may have completed/terminated", killId);
|
||||
} else {
|
||||
tscDebug("%p query is killed, queryId:%d", pSql, killId);
|
||||
taos_stop_query(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
void tscAddIntoStreamList(SSqlStream *pStream) {
|
||||
|
|
|
@ -467,44 +467,6 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
return doProcessSql(pSql);
|
||||
}
|
||||
|
||||
void tscKillSTableQuery(SSqlObj *pSql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
// NOTE: pSub may have been released already here
|
||||
SSqlObj *pSub = pSql->pSubs[i];
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSqlObj* pSubObj = (SSqlObj*) (*p);
|
||||
assert(pSubObj->self == (SSqlObj**) p);
|
||||
|
||||
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
if (pSubObj->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSubObj->pRpcCtx);
|
||||
pSubObj->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
tscQueueAsyncRes(pSubObj); // async res? not other functions?
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
}
|
||||
|
||||
tscDebug("%p super table query cancelled", pSql);
|
||||
}
|
||||
|
||||
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
|
||||
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
||||
|
|
|
@ -308,7 +308,7 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
|||
tsem_post(&pSql->rspSem);
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
||||
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES** res) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
|
@ -333,12 +333,20 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
|
|||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
|
||||
|
||||
if (res != NULL) {
|
||||
*res = pSql;
|
||||
}
|
||||
|
||||
tsem_wait(&pSql->rspSem);
|
||||
return pSql;
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
||||
return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr));
|
||||
return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr), NULL);
|
||||
}
|
||||
|
||||
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res) {
|
||||
return taos_query_c(taos, sqlstr, (uint32_t) strlen(sqlstr), res);
|
||||
}
|
||||
|
||||
int taos_result_precision(TAOS_RES *res) {
|
||||
|
@ -690,6 +698,45 @@ int* taos_fetch_lengths(TAOS_RES *res) {
|
|||
|
||||
char *taos_get_client_info() { return version; }
|
||||
|
||||
static void tscKillSTableQuery(SSqlObj *pSql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// set the master sqlObj flag to cancel query
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
// NOTE: pSub may have been released already here
|
||||
SSqlObj *pSub = pSql->pSubs[i];
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSqlObj* pSubObj = (SSqlObj*) (*p);
|
||||
assert(pSubObj->self == (SSqlObj**) p);
|
||||
|
||||
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
if (pSubObj->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSubObj->pRpcCtx);
|
||||
pSubObj->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
tscQueueAsyncRes(pSubObj);
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
}
|
||||
|
||||
tscDebug("%p super table query cancelled", pSql);
|
||||
}
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
SSqlObj *pSql = (SSqlObj *)res;
|
||||
if (pSql == NULL || pSql->signature != pSql) {
|
||||
|
@ -699,23 +746,26 @@ void taos_stop_query(TAOS_RES *res) {
|
|||
tscDebug("%p start to cancel query", res);
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
// TODO there are multi-thread problem.
|
||||
// It may have been released by the other thread already.
|
||||
// The ref count may fix this problem.
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
// set the error code for master pSqlObj firstly
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||
assert(pSql->pRpcCtx == NULL);
|
||||
tscKillSTableQuery(pSql);
|
||||
} else {
|
||||
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
/*
|
||||
* There is multi-thread problem here, since pSql->pRpcCtx may have been
|
||||
* reset and freed in the processMsgFromServer function, and causes the invalid
|
||||
* write problem for rpcCancelRequest.
|
||||
*/
|
||||
if (pSql->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSql->pRpcCtx);
|
||||
pSql->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
tscQueueAsyncRes(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1491,9 +1491,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
|
||||
tscDebug("%p start to free subquery obj", pSql);
|
||||
static void tscFreeRetrieveSup(SSqlObj *pSql) {
|
||||
SRetrieveSupport *trsupport = pSql->param;
|
||||
|
||||
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
|
||||
if (p == NULL) {
|
||||
tscDebug("%p retrieve supp already released", pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
|
||||
// int32_t index = trsupport->subqueryIndex;
|
||||
// SSqlObj *pParentSql = trsupport->pParentSql;
|
||||
|
||||
|
@ -1556,17 +1563,18 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
|
|||
}
|
||||
|
||||
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
|
||||
// it has been freed already
|
||||
if (pSql->param != trsupport || pSql->param == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlObj *pParentSql = trsupport->pParentSql;
|
||||
int32_t subqueryIndex = trsupport->subqueryIndex;
|
||||
|
||||
assert(pSql != NULL);
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
int32_t remain = pState->numOfRemain;
|
||||
int32_t sub = pState->numOfSub;
|
||||
UNUSED(remain);
|
||||
UNUSED(sub);
|
||||
|
||||
assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0);
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
||||
|
||||
// retrieved in subquery failed. OR query cancelled in retrieve phase.
|
||||
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1597,12 +1605,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
}
|
||||
}
|
||||
|
||||
remain = -1;
|
||||
if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) {
|
||||
int32_t remain = -1;
|
||||
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
|
||||
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
||||
pState->numOfSub - remain);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1614,7 +1622,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
|
||||
pState->numOfSub);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
|
||||
|
@ -1674,7 +1682,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
||||
pState->numOfSub - remain);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1694,7 +1702,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
pParentSql->res.numOfRows = 0;
|
||||
pParentSql->res.row = 0;
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
// set the command flag must be after the semaphore been correctly set.
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
||||
|
@ -1706,14 +1714,21 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
}
|
||||
|
||||
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
|
||||
SSqlObj *pSql = (SSqlObj *)tres;
|
||||
assert(pSql != NULL);
|
||||
|
||||
// this query has been freed already
|
||||
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
|
||||
if (pSql->param == NULL || param == NULL) {
|
||||
tscDebug("%p already freed in dnodecallback", pSql);
|
||||
assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
return;
|
||||
}
|
||||
|
||||
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
|
||||
int32_t idx = trsupport->subqueryIndex;
|
||||
SSqlObj * pParentSql = trsupport->pParentSql;
|
||||
|
||||
assert(tres != NULL);
|
||||
SSqlObj *pSql = (SSqlObj *)tres;
|
||||
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ void taos_init_imp(void) {
|
|||
int64_t refreshTime = 10; // 10 seconds by default
|
||||
if (tscMetaCache == NULL) {
|
||||
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
|
||||
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeSqlObjInCache, "sqlObj");
|
||||
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
|
||||
}
|
||||
|
||||
tscDebug("client is initialized successfully");
|
||||
|
|
|
@ -389,7 +389,7 @@ static void tscFreeSubobj(SSqlObj* pSql) {
|
|||
*
|
||||
* @param pSql
|
||||
*/
|
||||
void tscFreeSqlObjInCache(void *pSql) {
|
||||
void tscFreeRegisteredSqlObj(void *pSql) {
|
||||
assert(pSql != NULL);
|
||||
|
||||
SSqlObj** p = (SSqlObj**)pSql;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "taos.h"
|
||||
#include "taosdef.h"
|
||||
#include "stdbool.h"
|
||||
#include "tsclient.h"
|
||||
|
||||
#define MAX_USERNAME_SIZE 64
|
||||
#define MAX_DBNAME_SIZE 64
|
||||
|
|
|
@ -294,9 +294,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
|
|||
|
||||
st = taosGetTimestampUs();
|
||||
|
||||
TAOS_RES* pSql = taos_query(con, command);
|
||||
atomic_store_ptr(&result, pSql); // set the global TAOS_RES pointer
|
||||
|
||||
TAOS_RES* pSql = taos_query_h(con, command, &result);
|
||||
if (taos_errno(pSql)) {
|
||||
taos_error(pSql);
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue