fix: schedulerFreeJob reset jobId only on the last reference

This commit is contained in:
Shungang Li 2024-09-13 18:59:54 +08:00
parent 8af5e89d4e
commit 4ef4dddc7b
6 changed files with 29 additions and 12 deletions

View File

@ -46,6 +46,7 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid);
// release ref, rid is the reference ID returned by taosAddRef // release ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t taosReleaseRef(int32_t rsetId, int64_t rid); int32_t taosReleaseRef(int32_t rsetId, int64_t rid);
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased);
// return the first reference if rid is 0, otherwise return the next after current reference. // return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens) // if return value is NULL, it means list is over(if terrno is set, it means error happens)

View File

@ -1163,10 +1163,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
} }
} }
if (TSDB_CODE_SUCCESS == code) {
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
} }
}
taosMemoryFree(pResult); taosMemoryFree(pResult);
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code), tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),

View File

@ -570,6 +570,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob); int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
int32_t schReleaseJob(int64_t refId); int32_t schReleaseJob(int64_t refId);
int32_t schReleaseJobEx(int64_t refId, int32_t* released);
void schFreeFlowCtrl(SSchJob *pJob); void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);

View File

@ -41,6 +41,15 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId); return taosReleaseRef(schMgmt.jobRef, refId);
} }
FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) {
if (0 == refId) {
return TSDB_CODE_SUCCESS;
}
qDebug("sch release ex jobId:0x%" PRIx64, refId);
return taosReleaseRefEx(schMgmt.jobRef, refId, released);
}
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) { int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
*ppRes = NULL; *ppRes = NULL;
if (NULL == pEpSet) { if (NULL == pEpSet) {

View File

@ -179,8 +179,11 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
(void)schHandleJobDrop(pJob, errCode); // ignore any error (void)schHandleJobDrop(pJob, errCode); // ignore any error
(void)schReleaseJob(*jobId); // ignore error int32_t released = false;
(void)schReleaseJobEx(*jobId, &released); // ignore error
if (released) {
*jobId = 0; *jobId = 0;
}
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {

View File

@ -55,7 +55,7 @@ static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy);
static void taosIncRsetCount(SRefSet *pSet); static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased);
int32_t taosOpenRef(int32_t max, RefFp fp) { int32_t taosOpenRef(int32_t max, RefFp fp) {
SRefNode **nodeList; SRefNode **nodeList;
@ -181,7 +181,7 @@ int64_t taosAddRef(int32_t rsetId, void *p) {
return rid; return rid;
} }
int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); } int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1, NULL); }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int32_t rsetId, int64_t rid) { void *taosAcquireRef(int32_t rsetId, int64_t rid) {
@ -245,7 +245,8 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid) {
return p; return p;
} }
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); } int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0, NULL); }
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased) { return taosDecRefCount(rsetId, rid, 0, isReleased); }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int32_t rsetId, int64_t rid) { void *taosIterateRef(int32_t rsetId, int64_t rid) {
@ -372,7 +373,7 @@ int32_t taosListRef() {
return num; return num;
} }
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) { static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased) {
int32_t hash; int32_t hash;
SRefSet *pSet; SRefSet *pSet;
SRefNode *pNode; SRefNode *pNode;
@ -440,6 +441,10 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
taosDecRsetCount(pSet); taosDecRsetCount(pSet);
} }
if (isReleased) {
*isReleased = released;
}
return code; return code;
} }