Merge pull request #5161 from taosdata/hotfix/TD-2926
[TD-2926]client crashes when running crash_gen test
This commit is contained in:
commit
33db41f423
|
@ -273,7 +273,8 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
|
|||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
void tscAsyncResultOnError(SSqlObj *pSql) {
|
||||
static void tscAsyncResultCallback(SSchedMsg *pMsg) {
|
||||
SSqlObj* pSql = pMsg->ahandle;
|
||||
if (pSql == NULL || pSql->signature != pSql) {
|
||||
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
|
||||
return;
|
||||
|
@ -291,6 +292,16 @@ void tscAsyncResultOnError(SSqlObj *pSql) {
|
|||
(*pSql->fp)(pSql->param, pSql, pRes->code);
|
||||
}
|
||||
|
||||
void tscAsyncResultOnError(SSqlObj* pSql) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = tscAsyncResultCallback;
|
||||
schedMsg.ahandle = pSql;
|
||||
schedMsg.thandle = (void *)1;
|
||||
schedMsg.msg = 0;
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
|
||||
int tscSendMsgToServer(SSqlObj *pSql);
|
||||
|
||||
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||
|
|
|
@ -95,11 +95,21 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
|
|||
|
||||
pthread_mutex_lock(&subState->mutex);
|
||||
|
||||
bool done = allSubqueryDone(pParentSql);
|
||||
|
||||
if (done) {
|
||||
tscDebug("%p subquery:%p,%d all subs already done", pParentSql, pSql, idx);
|
||||
|
||||
pthread_mutex_unlock(&subState->mutex);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
|
||||
|
||||
subState->states[idx] = 1;
|
||||
|
||||
bool done = allSubqueryDone(pParentSql);
|
||||
done = allSubqueryDone(pParentSql);
|
||||
|
||||
pthread_mutex_unlock(&subState->mutex);
|
||||
|
||||
|
@ -2245,7 +2255,9 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
|
|||
* current query failed, and the retry count is less than the available
|
||||
* count, retry query clear previous retrieved data, then launch a new sub query
|
||||
*/
|
||||
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) {
|
||||
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) {
|
||||
*sent = 0;
|
||||
|
||||
SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
|
||||
if (trsupport == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -2277,21 +2289,28 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
|
|||
SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
|
||||
if (pNew == NULL) {
|
||||
tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
|
||||
trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
|
||||
oriTrs->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
|
||||
|
||||
pParentSql->res.code = terrno;
|
||||
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||
oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
|
||||
|
||||
tfree(trsupport);
|
||||
return pParentSql->res.code;
|
||||
}
|
||||
|
||||
int32_t ret = tscProcessSql(pNew);
|
||||
|
||||
*sent = 1;
|
||||
|
||||
// if failed to process sql, let following code handle the pSql
|
||||
if (ret == TSDB_CODE_SUCCESS) {
|
||||
tscFreeRetrieveSup(pSql);
|
||||
taos_free_result(pSql);
|
||||
return ret;
|
||||
} else {
|
||||
} else {
|
||||
pSql->pSubs[trsupport->subqueryIndex] = pSql;
|
||||
tscFreeRetrieveSup(pNew);
|
||||
taos_free_result(pNew);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -2328,7 +2347,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
subqueryIndex, tstrerror(pParentSql->res.code));
|
||||
} else {
|
||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
||||
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
|
||||
int32_t sent = 0;
|
||||
|
||||
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
|
||||
if (sent) {
|
||||
return;
|
||||
}
|
||||
} else { // reach the maximum retry count, abort
|
||||
|
@ -2482,7 +2504,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
|
||||
|
||||
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
|
||||
int32_t sent = 0;
|
||||
|
||||
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
|
||||
if (sent) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
|
@ -2604,7 +2629,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
|||
|
||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
|
||||
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
|
||||
|
||||
int32_t sent = 0;
|
||||
|
||||
tscReissueSubquery(trsupport, pSql, code, &sent);
|
||||
if (sent) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue