fix(query): add ref for job

This commit is contained in:
Haojun Liao 2022-06-03 23:13:21 +08:00
parent 96d6a088c7
commit 0c94c65acb
2 changed files with 13 additions and 1 deletions

View File

@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
} }
if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
doSetOneRowPtr(pResultInfo); doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1; pResultInfo->current += 1;
} }

View File

@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++));
} }
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
CTG_ERR_JRET(terrno);
}
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(*job); taosMemoryFreeClear(*job);
CTG_RET(code); CTG_RET(code);