fix bug
This commit is contained in:
parent
cfaa9adc58
commit
f526932ad4
|
@ -273,7 +273,8 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
|
||||||
taosScheduleTask(tscQhandle, &schedMsg);
|
taosScheduleTask(tscQhandle, &schedMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscAsyncResultOnError(SSqlObj *pSql) {
|
static void tscAsyncResultCallback(SSchedMsg *pMsg) {
|
||||||
|
SSqlObj* pSql = pMsg->ahandle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
|
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
|
||||||
return;
|
return;
|
||||||
|
@ -291,6 +292,16 @@ void tscAsyncResultOnError(SSqlObj *pSql) {
|
||||||
(*pSql->fp)(pSql->param, pSql, pRes->code);
|
(*pSql->fp)(pSql->param, pSql, pRes->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tscAsyncResultOnError(SSqlObj* pSql) {
|
||||||
|
SSchedMsg schedMsg = {0};
|
||||||
|
schedMsg.fp = tscAsyncResultCallback;
|
||||||
|
schedMsg.ahandle = pSql;
|
||||||
|
schedMsg.thandle = (void *)1;
|
||||||
|
schedMsg.msg = 0;
|
||||||
|
taosScheduleTask(tscQhandle, &schedMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int tscSendMsgToServer(SSqlObj *pSql);
|
int tscSendMsgToServer(SSqlObj *pSql);
|
||||||
|
|
||||||
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
|
|
|
@ -450,6 +450,7 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscAsyncResultOnError(pSql);
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,6 +459,7 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
|
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
|
tscAsyncResultOnError(pSql);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -686,6 +688,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
|
||||||
int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);
|
int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);
|
||||||
|
|
|
@ -2255,7 +2255,9 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
|
||||||
* current query failed, and the retry count is less than the available
|
* current query failed, and the retry count is less than the available
|
||||||
* count, retry query clear previous retrieved data, then launch a new sub query
|
* count, retry query clear previous retrieved data, then launch a new sub query
|
||||||
*/
|
*/
|
||||||
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) {
|
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) {
|
||||||
|
*sent = 0;
|
||||||
|
|
||||||
SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
|
SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
|
||||||
if (trsupport == NULL) {
|
if (trsupport == NULL) {
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
@ -2298,6 +2300,8 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
|
||||||
|
|
||||||
int32_t ret = tscProcessSql(pNew);
|
int32_t ret = tscProcessSql(pNew);
|
||||||
|
|
||||||
|
*sent = 1;
|
||||||
|
|
||||||
// if failed to process sql, let following code handle the pSql
|
// if failed to process sql, let following code handle the pSql
|
||||||
if (ret == TSDB_CODE_SUCCESS) {
|
if (ret == TSDB_CODE_SUCCESS) {
|
||||||
tscFreeRetrieveSup(pSql);
|
tscFreeRetrieveSup(pSql);
|
||||||
|
@ -2343,7 +2347,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
||||||
subqueryIndex, tstrerror(pParentSql->res.code));
|
subqueryIndex, tstrerror(pParentSql->res.code));
|
||||||
} else {
|
} else {
|
||||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
||||||
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
|
int32_t sent = 0;
|
||||||
|
|
||||||
|
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
|
||||||
|
if (sent) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else { // reach the maximum retry count, abort
|
} else { // reach the maximum retry count, abort
|
||||||
|
@ -2497,7 +2504,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
||||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||||
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
|
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
|
||||||
|
|
||||||
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
|
int32_t sent = 0;
|
||||||
|
|
||||||
|
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
|
||||||
|
if (sent) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -2619,7 +2629,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
||||||
|
|
||||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||||
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
|
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
|
||||||
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
|
|
||||||
|
int32_t sent = 0;
|
||||||
|
|
||||||
|
tscReissueSubquery(trsupport, pSql, code, &sent);
|
||||||
|
if (sent) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue