diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b6ab3702c9..8621f9d28b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -80,6 +80,8 @@ enum { DATA_FROM_DATA_FILE = 2, }; +typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); + typedef struct STableComInfo { uint8_t numOfTags; uint8_t precision; @@ -226,7 +228,7 @@ typedef struct STableDataBlocks { typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. uint32_t type; // query/insert type - // TODO refactor + STimeWindow window; // query time window SInterval interval; @@ -440,19 +442,20 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql); * @param pObj */ void tscFreeSqlObj(SSqlObj *pSql); - -void tscFreeSqlObjInCache(void *pSql); +void tscFreeRegisteredSqlObj(void *pSql); void tscCloseTscObj(STscObj *pObj); +// todo move to taos? or create a new file: taos_internal.h TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); -void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; +TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res); -void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); +void waitForQueryRsp(void *param, TAOS_RES *tres, int code); + +void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); -void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); @@ -517,8 +520,6 @@ extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); -typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); - int32_t tscCompareTidTags(const void* p1, const void* p2); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a76b77bb86..639de294e6 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd* pCmd = &pSql->cmd; pSql->signature = pSql; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 16f208da98..d2f74bdd59 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -742,6 +742,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe int32_t numOfVnodes) { destroyColumnModel(pFinalModel); tOrderDescDestroy(pDesc); + for (int32_t i = 0; i < numOfVnodes; ++i) { pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]); } diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index bcd52d3a42..bae0f91dcc 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -151,10 +151,12 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) { pthread_mutex_unlock(&pObj->mutex); - if (pSql == NULL) return; - - tscDebug("%p query is killed, queryId:%d", pSql, killId); - taos_stop_query(pSql); + if (pSql == NULL) { + tscError("failed to kill query, id:%d, it may have completed/terminated", killId); + } else { + tscDebug("%p query is killed, queryId:%d", pSql, killId); + taos_stop_query(pSql); + } } void tscAddIntoStreamList(SSqlStream *pStream) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5cddaa1c4d..494a8a9c30 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -467,44 +467,6 @@ int tscProcessSql(SSqlObj *pSql) { return doProcessSql(pSql); } -void tscKillSTableQuery(SSqlObj *pSql) { - SSqlCmd* pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - return; - } - - pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - - for (int i = 0; i < pSql->subState.numOfSub; ++i) { - // NOTE: pSub may have been released already here - SSqlObj *pSub = pSql->pSubs[i]; - if (pSub == NULL) { - continue; - } - - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); - - pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSubObj->pRpcCtx != NULL) { - rpcCancelRequest(pSubObj->pRpcCtx); - pSubObj->pRpcCtx = NULL; - } - - tscQueueAsyncRes(pSubObj); // async res? not other functions? - taosCacheRelease(tscObjCache, (void**) &p, false); - } - - tscDebug("%p super table query cancelled", pSql); -} - int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 58588e43f7..be91255b9c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -308,7 +308,7 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { tsem_post(&pSql->rspSem); } -TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { +TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES** res) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; @@ -333,12 +333,20 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { tsem_init(&pSql->rspSem, 0, 0); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); + if (res != NULL) { + *res = pSql; + } + tsem_wait(&pSql->rspSem); return pSql; } TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { - return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr)); + return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr), NULL); +} + +TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res) { + return taos_query_c(taos, sqlstr, (uint32_t) strlen(sqlstr), res); } int taos_result_precision(TAOS_RES *res) { @@ -690,6 +698,45 @@ int* taos_fetch_lengths(TAOS_RES *res) { char *taos_get_client_info() { return version; } +static void tscKillSTableQuery(SSqlObj *pSql) { + SSqlCmd* pCmd = &pSql->cmd; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + return; + } + + // set the master sqlObj flag to cancel query + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + + for (int i = 0; i < pSql->subState.numOfSub; ++i) { + // NOTE: pSub may have been released already here + SSqlObj *pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + continue; + } + + SSqlObj* pSubObj = (SSqlObj*) (*p); + assert(pSubObj->self == (SSqlObj**) p); + + pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + if (pSubObj->pRpcCtx != NULL) { + rpcCancelRequest(pSubObj->pRpcCtx); + pSubObj->pRpcCtx = NULL; + } + + tscQueueAsyncRes(pSubObj); + taosCacheRelease(tscObjCache, (void**) &p, false); + } + + tscDebug("%p super table query cancelled", pSql); +} + void taos_stop_query(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) { @@ -699,23 +746,26 @@ void taos_stop_query(TAOS_RES *res) { tscDebug("%p start to cancel query", res); SSqlCmd *pCmd = &pSql->cmd; - // TODO there are multi-thread problem. - // It may have been released by the other thread already. - // The ref count may fix this problem. - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - // set the error code for master pSqlObj firstly pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { assert(pSql->pRpcCtx == NULL); tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { + /* + * There is multi-thread problem here, since pSql->pRpcCtx may have been + * reset and freed in the processMsgFromServer function, and causes the invalid + * write problem for rpcCancelRequest. + */ if (pSql->pRpcCtx != NULL) { rpcCancelRequest(pSql->pRpcCtx); pSql->pRpcCtx = NULL; } + + tscQueueAsyncRes(pSql); } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index abfe62c72c..5ccb2aee8c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1491,9 +1491,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { - tscDebug("%p start to free subquery obj", pSql); +static void tscFreeRetrieveSup(SSqlObj *pSql) { + SRetrieveSupport *trsupport = pSql->param; + void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); + if (p == NULL) { + tscDebug("%p retrieve supp already released", pSql); + return; + } + + tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); // int32_t index = trsupport->subqueryIndex; // SSqlObj *pParentSql = trsupport->pParentSql; @@ -1556,17 +1563,18 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in } void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { + // it has been freed already + if (pSql->param != trsupport || pSql->param == NULL) { + return; + } + SSqlObj *pParentSql = trsupport->pParentSql; int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = &pParentSql->subState; - int32_t remain = pState->numOfRemain; - int32_t sub = pState->numOfSub; - UNUSED(remain); - UNUSED(sub); - assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1597,12 +1605,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { + int32_t remain = -1; + if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1614,7 +1622,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, pState->numOfSub); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); @@ -1674,7 +1682,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1694,7 +1702,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // set the command flag must be after the semaphore been correctly set. pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; @@ -1706,14 +1714,21 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { + SSqlObj *pSql = (SSqlObj *)tres; + assert(pSql != NULL); + + // this query has been freed already SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + if (pSql->param == NULL || param == NULL) { + tscDebug("%p already freed in dnodecallback", pSql); + assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED); + return; + } + tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; int32_t idx = trsupport->subqueryIndex; SSqlObj * pParentSql = trsupport->pParentSql; - assert(tres != NULL); - SSqlObj *pSql = (SSqlObj *)tres; - SSubqueryState* pState = &pParentSql->subState; assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 85c8a57058..4c5dbb079f 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -141,7 +141,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); - tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeSqlObjInCache, "sqlObj"); + tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); } tscDebug("client is initialized successfully"); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 33c51c5571..0235f037bd 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -389,7 +389,7 @@ static void tscFreeSubobj(SSqlObj* pSql) { * * @param pSql */ -void tscFreeSqlObjInCache(void *pSql) { +void tscFreeRegisteredSqlObj(void *pSql) { assert(pSql != NULL); SSqlObj** p = (SSqlObj**)pSql; diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index dd62df170a..765181dbba 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -20,6 +20,7 @@ #include "taos.h" #include "taosdef.h" #include "stdbool.h" +#include "tsclient.h" #define MAX_USERNAME_SIZE 64 #define MAX_DBNAME_SIZE 64 diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index d5e826fbaa..24388bf50c 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -294,9 +294,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); - TAOS_RES* pSql = taos_query(con, command); - atomic_store_ptr(&result, pSql); // set the global TAOS_RES pointer - + TAOS_RES* pSql = taos_query_h(con, command, &result); if (taos_errno(pSql)) { taos_error(pSql); return;