diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f256feb251..3c6a49c3db 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -226,23 +226,27 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { - SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); - if (code != TSDB_CODE_SUCCESS) { - // handle error and retry - } else { - if (pRequest->body.pQueryJob != NULL) { - schedulerFreeJob(pRequest->body.pQueryJob); - } + SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); + if (code != TSDB_CODE_SUCCESS) { + if (pRequest->body.pQueryJob != NULL) { + schedulerFreeJob(pRequest->body.pQueryJob); } - pRequest->body.resInfo.numOfRows = res.numOfRows; - pRequest->code = res.code; + pRequest->code = code; return pRequest->code; } - return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob); + if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { + pRequest->body.resInfo.numOfRows = res.numOfRows; + + if (pRequest->body.pQueryJob != NULL) { + schedulerFreeJob(pRequest->body.pQueryJob); + } + } + + pRequest->code = res.code; + return pRequest->code; } TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 45e125608a..f14b95873a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -578,7 +578,7 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod atomic_store_32(&pJob->errCode, errCode); } - if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { + if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); } @@ -638,7 +638,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED)); - if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) { + if (pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); }