From 876942b4f2a44de024a0afb5585467d3edb39f3d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jan 2022 09:22:48 +0800 Subject: [PATCH] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 28 ++++++++-- source/libs/scheduler/src/scheduler.c | 55 ++++++++++--------- source/libs/scheduler/test/schedulerTests.cpp | 21 +++++++ 3 files changed, 75 insertions(+), 29 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index c83eba4232..661beee5d5 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -36,11 +36,31 @@ enum { SCH_WRITE, }; +typedef struct SSchApiStat { + +} SSchApiStat; + +typedef struct SSchRuntimeStat { + +} SSchRuntimeStat; + +typedef struct SSchJobStat { + +} SSchJobStat; + +typedef struct SSchedulerStat { + SSchApiStat api; + SSchRuntimeStat runtime; + SSchJobStat job; +} SSchedulerStat; + + typedef struct SSchedulerMgmt { - uint64_t taskId; // sequential taksId - uint64_t sId; // schedulerId - SSchedulerCfg cfg; - SHashObj *jobs; // key: queryId, value: SQueryJob* + uint64_t taskId; // sequential taksId + uint64_t sId; // schedulerId + SSchedulerCfg cfg; + SHashObj *jobs; // key: queryId, value: SQueryJob* + SSchedulerStat stat; } SSchedulerMgmt; typedef struct SSchCallbackParam { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ed12408844..aeec1ff5a0 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1462,35 +1462,38 @@ void scheduleFreeJob(void *job) { } SSchJob *pJob = job; + uint64_t queryId = pJob->queryId; - if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { - SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); - return; - } - - schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); - - SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); - - while (true) { - int32_t ref = atomic_load_32(&pJob->ref); - if (0 == ref) { - break; - } else if (ref > 0) { - usleep(1); - } else { - assert(0); + if (SCH_GET_JOB_STATUS(pJob) > 0) { + if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { + SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); + return; } + + schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); + + SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); + + while (true) { + int32_t ref = atomic_load_32(&pJob->ref); + if (0 == ref) { + break; + } else if (ref > 0) { + usleep(1); + } else { + assert(0); + } + } + + SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); + + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { + schCancelJob(pJob); + } + + schDropJobAllTasks(pJob); } - SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); - - if (pJob->status == JOB_TASK_STATUS_EXECUTING) { - schCancelJob(pJob); - } - - schDropJobAllTasks(pJob); - pJob->subPlans = NULL; // it is a reference to pDag->pSubplans int32_t numOfLevels = taosArrayGetSize(pJob->levels); @@ -1515,6 +1518,8 @@ void scheduleFreeJob(void *job) { tfree(pJob->res); tfree(pJob); + + qDebug("QID:%"PRIx64" job freed", queryId); } void schedulerDestroy(void) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index b438521234..114b3c02b5 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->level = 1; scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; mergePlan->id.templateId = 0x4444444444; @@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) { mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pParents = NULL; mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + mergePlan->msgType = TDMT_VND_QUERY; SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); @@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { } +void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { + +} + + void schtSetPlanToString() { static Stub stub; @@ -190,6 +197,20 @@ void schtSetExecNode() { } } +void schtSetRpcSendRequest() { + static Stub stub; + stub.set(rpcSendRequest, schtRpcSendRequest); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRequest$", result); + for (const auto& f : result) { + stub.set(f.second, schtRpcSendRequest); + } + } +} + + void *schtSendRsp(void *param) { SSchJob *job = NULL; int32_t code = 0;