diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index f0349c2b3d..a012ca5a7f 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -50,6 +50,12 @@ void tscUnlockByThread(int64_t *lockedBy); int tsInsertInitialCheck(SSqlObj *pSql); +void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); + +void tscFreeRetrieveSup(SSqlObj *pSql); + + + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 553d2c0804..816347cecd 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2034,17 +2034,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { tscAsyncResultOnError(pSql); } -static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { +void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0); for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; assert(pSub != NULL); - - SRetrieveSupport* pSupport = pSub->param; - - tfree(pSupport->localBuffer); - tfree(pSupport); + + tscFreeRetrieveSup(pSub); taos_free_result(pSub); } @@ -2562,7 +2559,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static void tscFreeRetrieveSup(SSqlObj *pSql) { +void tscFreeRetrieveSup(SSqlObj *pSql) { SRetrieveSupport *trsupport = pSql->param; void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); @@ -2720,33 +2717,43 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { int32_t code = pParentSql->res.code; - if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { - // remove the cached tableMeta and vgroup id list, and then parse the sql again - SSqlCmd* pParentCmd = &pParentSql->cmd; - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); - tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + SSqlObj *userSql = NULL; + if (pParentSql->param) { + userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql; + } - pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); - pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (userSql == NULL) { + userSql = pParentSql; + } - pParentSql->res.code = TSDB_CODE_SUCCESS; - pParentSql->retry++; + if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) { + if (userSql != pParentSql) { + tscFreeRetrieveSup(pParentSql); + } - tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, - tstrerror(code), pParentSql->retry); + tscFreeSubobj(userSql); + tfree(userSql->pSubs); - code = tsParseSql(pParentSql, true); + userSql->res.code = TSDB_CODE_SUCCESS; + userSql->retry++; + + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", userSql->self, + tstrerror(code), userSql->retry); + + tscResetSqlCmd(&userSql->cmd, true); + code = tsParseSql(userSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; } if (code != TSDB_CODE_SUCCESS) { - pParentSql->res.code = code; - tscAsyncResultOnError(pParentSql); + userSql->res.code = code; + tscAsyncResultOnError(userSql); return; } - executeQuery(pParentSql, pQueryInfo); + pQueryInfo = tscGetQueryInfo(&userSql->cmd); + executeQuery(userSql, pQueryInfo); } else { (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6e0bf6f7d2..1af8eaac2e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3767,8 +3767,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { int32_t index = ps->subqueryIndex; bool ret = subAndCheckDone(pSql, pParentSql, index); - tfree(ps); - pSql->param = NULL; + tscFreeRetrieveSup(pSql); if (!ret) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index); @@ -3778,12 +3777,13 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { // todo refactor tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); - SSqlCmd* pParentCmd = &pParentSql->cmd; - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); - tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) { + tscAsyncResultOnError(pParentSql); + return; + } - pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); - pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + tscFreeSubobj(pParentSql); + tfree(pParentSql->pSubs); pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; @@ -3791,6 +3791,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tstrerror(code), pParentSql->retry); + + tscResetSqlCmd(&pParentSql->cmd, true); + code = tsParseSql(pParentSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; @@ -3802,7 +3805,8 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { return; } - SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); + executeQuery(pParentSql, pQueryInfo); return; } @@ -3825,9 +3829,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly + assert(pSql->subState.numOfSub == 0); pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream); - + assert(pSql->pSubs == NULL); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); + assert(pSql->subState.states == NULL); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); code = pthread_mutex_init(&pSql->subState.mutex, NULL);