[td-1637]
This commit is contained in:
parent
4c22b72d06
commit
d838bacdbe
|
@ -742,6 +742,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
|
|||
int32_t numOfVnodes) {
|
||||
destroyColumnModel(pFinalModel);
|
||||
tOrderDescDestroy(pDesc);
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
|
||||
}
|
||||
|
|
|
@ -475,6 +475,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
return;
|
||||
}
|
||||
|
||||
// set the master sqlObj flag to cancel query
|
||||
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
|
||||
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||
|
@ -498,7 +499,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
pSubObj->pRpcCtx = NULL;
|
||||
}
|
||||
|
||||
tscQueueAsyncRes(pSubObj); // async res? not other functions?
|
||||
// tscQueueAsyncRes(pSubObj); // async res? not other functions?
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -1491,9 +1491,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
|
||||
tscDebug("%p start to free subquery obj", pSql);
|
||||
static void tscFreeRetrieveSup(SSqlObj *pSql) {
|
||||
SRetrieveSupport *trsupport = pSql->param;
|
||||
|
||||
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
|
||||
if (p == NULL) {
|
||||
tscDebug("%p retrieve supp already released", pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
|
||||
// int32_t index = trsupport->subqueryIndex;
|
||||
// SSqlObj *pParentSql = trsupport->pParentSql;
|
||||
|
||||
|
@ -1560,13 +1567,9 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
int32_t subqueryIndex = trsupport->subqueryIndex;
|
||||
|
||||
assert(pSql != NULL);
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
int32_t remain = pState->numOfRemain;
|
||||
int32_t sub = pState->numOfSub;
|
||||
UNUSED(remain);
|
||||
UNUSED(sub);
|
||||
|
||||
assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0);
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
||||
|
||||
// retrieved in subquery failed. OR query cancelled in retrieve phase.
|
||||
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1597,12 +1600,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
}
|
||||
}
|
||||
|
||||
remain = -1;
|
||||
if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) {
|
||||
int32_t remain = -1;
|
||||
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
|
||||
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
||||
pState->numOfSub - remain);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1614,7 +1617,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
|
||||
pState->numOfSub);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
|
||||
|
@ -1674,7 +1677,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
||||
pState->numOfSub - remain);
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1694,7 +1697,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|||
pParentSql->res.numOfRows = 0;
|
||||
pParentSql->res.row = 0;
|
||||
|
||||
tscFreeSubSqlObj(trsupport, pSql);
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
// set the command flag must be after the semaphore been correctly set.
|
||||
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
|
||||
|
@ -1711,8 +1714,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
int32_t idx = trsupport->subqueryIndex;
|
||||
SSqlObj * pParentSql = trsupport->pParentSql;
|
||||
|
||||
assert(tres != NULL);
|
||||
SSqlObj *pSql = (SSqlObj *)tres;
|
||||
assert(pSql != NULL && trsupport == pSql->param);
|
||||
|
||||
SSubqueryState* pState = &pParentSql->subState;
|
||||
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
||||
|
|
Loading…
Reference in New Issue