From c572b9cad254265f4cf4905f809905323c41bd2c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 3 Mar 2022 15:07:33 +0800 Subject: [PATCH] feature/qnode --- include/libs/scheduler/scheduler.h | 10 +- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 12 +- source/libs/scheduler/inc/schedulerInt.h | 15 +- source/libs/scheduler/src/scheduler.c | 283 ++++++++++-------- source/libs/scheduler/test/schedulerTests.cpp | 120 +++++--- 6 files changed, 244 insertions(+), 198 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index e856adaf31..b2080cb655 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,8 +23,6 @@ extern "C" { #include "catalog.h" #include "planner.h" -struct SSchJob; - typedef struct SSchedulerCfg { uint32_t maxJobNum; } SSchedulerCfg; @@ -72,7 +70,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. @@ -80,7 +78,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob); +int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob); /** * Fetch query result from the remote query executor @@ -88,7 +86,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa * @param data * @return */ -int32_t schedulerFetchRows(struct SSchJob *pJob, void **data); +int32_t schedulerFetchRows(int64_t job, void **data); /** @@ -102,7 +100,7 @@ int32_t schedulerFetchRows(struct SSchJob *pJob, void **data); * Free the query job * @param pJob */ -void schedulerFreeJob(void *pJob); +void schedulerFreeJob(int64_t job); void schedulerDestroy(void); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 79909a5696..c93ea1aabe 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,7 +171,7 @@ typedef struct SRequestSendRecvBody { void* fp; SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; - struct SSchJob* pQueryJob; // query job, created according to sql query DAG. + int64_t queryJob; // query job, created according to sql query DAG. struct SQueryDag* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3c6a49c3db..a0ba668f8a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -227,10 +227,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { - if (pRequest->body.pQueryJob != NULL) { - schedulerFreeJob(pRequest->body.pQueryJob); + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); } pRequest->code = code; @@ -240,8 +240,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; - if (pRequest->body.pQueryJob != NULL) { - schedulerFreeJob(pRequest->body.pQueryJob); + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); } } @@ -494,7 +494,7 @@ void* doFetchRow(SRequestObj* pRequest) { } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData); + int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; return NULL; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 42270cd645..50c274ad48 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -36,6 +36,11 @@ enum { SCH_WRITE, }; +typedef struct SSchTrans { + void *transInst; + void *transHandle; +} SSchTrans; + typedef struct SSchApiStat { } SSchApiStat; @@ -59,12 +64,13 @@ typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId uint64_t sId; // schedulerId SSchedulerCfg cfg; - SHashObj *jobs; // key: queryId, value: SQueryJob* + int32_t jobRef; SSchedulerStat stat; } SSchedulerMgmt; typedef struct SSchCallbackParam { uint64_t queryId; + int64_t refId; uint64_t taskId; } SSchCallbackParam; @@ -75,7 +81,8 @@ typedef struct SSchLevel { int32_t taskFailed; int32_t taskSucceed; int32_t taskNum; - SArray *subTasks; // Element is SQueryTask + int32_t taskLaunchIdx; // launch startup index + SArray *subTasks; // Element is SQueryTask } SSchLevel; typedef struct SSchTask { @@ -105,6 +112,7 @@ typedef struct SSchJobAttr { } SSchJobAttr; typedef struct SSchJob { + int64_t refId; uint64_t queryId; SSchJobAttr attr; int32_t levelNum; @@ -119,7 +127,6 @@ typedef struct SSchJob { SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* - int32_t ref; int8_t status; SQueryNodeAddr resNode; tsem_t rspSem; @@ -168,6 +175,8 @@ typedef struct SSchJob { static int32_t schLaunchTask(SSchJob *job, SSchTask *task); static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); +SSchJob *schAcquireJob(int64_t refId); +int32_t schReleaseJob(int64_t refId); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f14b95873a..f1ed0cef7d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -17,12 +17,17 @@ #include "tmsg.h" #include "query.h" #include "catalog.h" +#include "tref.h" -typedef struct SSchTrans { - void *transInst; - void *transHandle; -}SSchTrans; -static SSchedulerMgmt schMgmt = {0}; +SSchedulerMgmt schMgmt = {0}; + +FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { + return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); +} + +FORCE_INLINE int32_t schReleaseJob(int64_t refId) { + return taosReleaseRef(schMgmt.jobRef, refId); +} uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); @@ -886,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } case TDMT_VND_DROP_TASK_RSP: { // SHOULD NEVER REACH HERE - SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref)); + SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); break; } @@ -908,28 +913,23 @@ _return: int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob *pJob = NULL; SSchTask *pTask = NULL; - SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); - if (NULL == job || NULL == (*job)) { - qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); + SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, pParam->refId); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } - pJob = *job; - - atomic_add_fetch_32(&pJob->ref, 1); - int32_t s = taosHashGetSize(pJob->execTasks); if (s <= 0) { - qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId); + SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); + SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -942,7 +942,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in _return: if (pJob) { - atomic_sub_fetch_32(&pJob->ref, 1); + taosReleaseRef(schMgmt.jobRef, pParam->refId); } tfree(param); @@ -1003,28 +1003,29 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } -int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SSchTrans *trans = (SSchTrans *)transport; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam)); if (NULL == param) { - qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam)); + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); - param->queryId = qId; - param->taskId = tId; + param->queryId = pJob->queryId; + param->refId = pJob->refId; + param->taskId = pTask->taskId; pMsgSendInfo->param = param; @@ -1040,7 +1041,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t SCH_ERR_JRET(code); } - qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType)); + SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType)); return TSDB_CODE_SUCCESS; _return: @@ -1160,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, atomic_store_32(&pTask->lastMsgType, msgType); SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle}; - SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); + SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize)); if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); @@ -1283,7 +1284,60 @@ void schDropJobAllTasks(SSchJob *pJob) { schDropTaskInHashList(pJob, pJob->failTasks); } -static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) { + +int32_t schCancelJob(SSchJob *pJob) { + //TODO + + //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + +} + +void schFreeJobImpl(void *job) { + if (NULL == job) { + return; + } + + SSchJob *pJob = job; + uint64_t queryId = pJob->queryId; + int64_t refId = pJob->refId; + + if (pJob->status == JOB_TASK_STATUS_EXECUTING) { + schCancelJob(pJob); + } + + schDropJobAllTasks(pJob); + + pJob->subPlans = NULL; // it is a reference to pDag->pSubplans + + int32_t numOfLevels = taosArrayGetSize(pJob->levels); + for(int32_t i = 0; i < numOfLevels; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); + schFreeTask(pTask); + } + + taosArrayDestroy(pLevel->subTasks); + } + + taosHashCleanup(pJob->execTasks); + taosHashCleanup(pJob->failTasks); + taosHashCleanup(pJob->succTasks); + + taosArrayDestroy(pJob->levels); + taosArrayDestroy(pJob->nodeList); + + tfree(pJob->res); + + tfree(pJob); + + qDebug("QID:0x%"PRIx64" job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); +} + + +static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, int64_t *job, const char* sql, bool syncSchedule) { qDebug("QID:0x%"PRIx64" job started", pDag->queryId); if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) { @@ -1327,21 +1381,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa tsem_init(&pJob->rspSem, 0, 0); - code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES); - if (0 != code) { - if (HASH_NODE_EXIST(code)) { - SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } else { - SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + pJob->refId = taosAddRef(schMgmt.jobRef, pJob); + if (pJob->refId < 0) { + SCH_JOB_ELOG("taosHashPut job failed, error:%s", tstrerror(terrno)); + SCH_ERR_JRET(terrno); } + SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); + pJob->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(pJob)); - *(SSchJob **)job = pJob; + taosAcquireRef(schMgmt.jobRef, pJob->refId); + + *job = pJob->refId; if (syncSchedule) { SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); @@ -1349,25 +1402,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa } SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); + + taosReleaseRef(schMgmt.jobRef, pJob->refId); + return TSDB_CODE_SUCCESS; _return: - *(SSchJob **)job = NULL; - schedulerFreeJob(pJob); + schFreeJobImpl(pJob); SCH_RET(code); } -int32_t schCancelJob(SSchJob *pJob) { - //TODO - - //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST - -} - int32_t schedulerInit(SSchedulerCfg *cfg) { - if (schMgmt.jobs) { + if (schMgmt.jobRef) { qError("scheduler already initialized"); return TSDB_CODE_QRY_INVALID_INPUT; } @@ -1381,9 +1429,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } else { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } - - schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.jobs) { + + schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); + if (schMgmt.jobRef < 0) { qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1398,24 +1446,28 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) { +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); - pRes->code = atomic_load_32(&(*pJob)->errCode); - pRes->numOfRows = (*pJob)->resNumOfRows; + + SSchJob *job = taosAcquireRef(schMgmt.jobRef, *pJob); + pRes->code = atomic_load_32(&job->errCode); + pRes->numOfRows = job->resNumOfRows; + taosReleaseRef(schMgmt.jobRef, *pJob); return TSDB_CODE_SUCCESS; } -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) { +int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false)); + return TSDB_CODE_SUCCESS; } @@ -1541,28 +1593,35 @@ _return: } -int32_t schedulerFetchRows(SSchJob *pJob, void** pData) { - if (NULL == pJob || NULL == pData) { +int32_t schedulerFetchRows(int64_t job, void** pData) { + if (NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t code = 0; - atomic_add_fetch_32(&pJob->ref, 1); + SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + taosReleaseRef(schMgmt.jobRef, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + taosReleaseRef(schMgmt.jobRef, job); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + taosReleaseRef(schMgmt.jobRef, job); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { @@ -1588,7 +1647,6 @@ int32_t schedulerFetchRows(SSchJob *pJob, void** pData) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } -_return: while (true) { *pData = atomic_load_ptr(&pJob->res); @@ -1609,96 +1667,47 @@ _return: SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } - atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); - SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code)); - atomic_sub_fetch_32(&pJob->ref, 1); + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + taosReleaseRef(schMgmt.jobRef, job); SCH_RET(code); } -int32_t scheduleCancelJob(void *job) { - SSchJob *pJob = (SSchJob *)job; - - atomic_add_fetch_32(&pJob->ref, 1); +int32_t scheduleCancelJob(int64_t job) { + SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } int32_t code = schCancelJob(pJob); - atomic_sub_fetch_32(&pJob->ref, 1); + taosReleaseRef(schMgmt.jobRef, job); SCH_RET(code); } -void schedulerFreeJob(void *job) { - if (NULL == job) { +void schedulerFreeJob(int64_t job) { + SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); return; } - SSchJob *pJob = job; - uint64_t queryId = pJob->queryId; - bool setJobFree = false; - - if (SCH_GET_JOB_STATUS(pJob) > 0) { - if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { - SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); - return; - } - - SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); - - while (true) { - int32_t ref = atomic_load_32(&pJob->ref); - if (0 == ref) { - break; - } else if (ref > 0) { - if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) { - schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); - setJobFree = true; - } - - usleep(1); - } else { - SCH_JOB_ELOG("invalid job ref number, ref:%d", ref); - break; - } - } - - SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); - - if (pJob->status == JOB_TASK_STATUS_EXECUTING) { - schCancelJob(pJob); - } - - schDropJobAllTasks(pJob); + if (atomic_load_8(&pJob->userFetch) > 0) { + schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); } - pJob->subPlans = NULL; // it is a reference to pDag->pSubplans - - int32_t numOfLevels = taosArrayGetSize(pJob->levels); - for(int32_t i = 0; i < numOfLevels; ++i) { - SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job); - int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); - for(int32_t j = 0; j < numOfTasks; ++j) { - SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); - schFreeTask(pTask); - } - - taosArrayDestroy(pLevel->subTasks); + if (taosRemoveRef(schMgmt.jobRef, job)) { + SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job); } - - taosHashCleanup(pJob->execTasks); - taosHashCleanup(pJob->failTasks); - taosHashCleanup(pJob->succTasks); - - taosArrayDestroy(pJob->levels); - taosArrayDestroy(pJob->nodeList); - - tfree(pJob->res); - - tfree(pJob); - - qDebug("QID:0x%"PRIx64" job freed", queryId); } void schedulerFreeTaskList(SArray *taskList) { @@ -1716,9 +1725,17 @@ void schedulerFreeTaskList(SArray *taskList) { } void schedulerDestroy(void) { - if (schMgmt.jobs) { - taosHashCleanup(schMgmt.jobs); //TODO - schMgmt.jobs = NULL; + if (schMgmt.jobRef) { + SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); + + while (pJob) { + taosRemoveRef(schMgmt.jobRef, pJob->refId); + + pJob = taosIterateRef(schMgmt.jobRef, pJob->refId); + } + + taosCloseRef(schMgmt.jobRef); + schMgmt.jobRef = 0; } } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 89d365a7e7..11ed3335e6 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -38,15 +38,15 @@ #include "schedulerInt.h" #include "stub.h" #include "addr_any.h" - +#include "tref.h" namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode); -struct SSchJob *pInsertJob = NULL; -struct SSchJob *pQueryJob = NULL; +int64_t insertJobRefId = 0; +int64_t queryJobRefId = 0; uint64_t schtMergeTemplateId = 0x4; uint64_t schtFetchTaskId = 0; @@ -65,6 +65,7 @@ void schtInitLogFile() { tsAsyncLog = 0; qDebugFlag = 159; + strcpy(tsLogDir, "/var/log/taos"); if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { printf("failed to open log file in directory:%s\n", tsLogDir); @@ -255,34 +256,40 @@ void schtSetAsyncSendMsgToServer() { void *schtSendRsp(void *param) { - SSchJob *job = NULL; + SSchJob *pJob = NULL; + int64_t job = 0; int32_t code = 0; while (true) { - job = *(SSchJob **)param; + job = *(int64_t *)param; if (job) { break; } usleep(1000); } + + pJob = schAcquireJob(job); - void *pIter = taosHashIterate(job->execTasks, NULL); + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SSubmitRsp rsp = {0}; rsp.affectedRows = 10; - schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0); + schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } + schReleaseJob(job); + return NULL; } void *schtCreateFetchRspThread(void *param) { - struct SSchJob* job = (struct SSchJob*)param; + int64_t job = *(int64_t *)param; + SSchJob* pJob = schAcquireJob(job); sleep(1); @@ -291,8 +298,10 @@ void *schtCreateFetchRspThread(void *param) { rsp->completed = 1; rsp->numOfRows = 10; - code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); - + code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); + + schReleaseJob(job); + assert(code == 0); } @@ -329,9 +338,9 @@ void *schtFetchRspThread(void *aa) { void schtFreeQueryJob(int32_t freeThread) { static uint32_t freeNum = 0; - SSchJob *job = atomic_load_ptr(&pQueryJob); + int64_t job = queryJobRefId; - if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { + if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) { schedulerFreeJob(job); if (freeThread) { if (++freeNum % schtTestPrintNum == 0) { @@ -360,7 +369,7 @@ void* schtRunJobThread(void *aa) { schtSetExecNode(); schtSetAsyncSendMsgToServer(); - SSchJob *job = NULL; + SSchJob *pJob = NULL; SSchCallbackParam *param = NULL; SHashObj *execTasks = NULL; SDataBuf dataBuf = {0}; @@ -376,24 +385,29 @@ void* schtRunJobThread(void *aa) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId); assert(code == 0); + pJob = schAcquireJob(queryJobRefId); + if (NULL == pJob) { + taosArrayDestroy(qnodeList); + schtFreeQueryDag(&dag); + continue; + } + execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - void *pIter = taosHashIterate(job->execTasks, NULL); + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; schtFetchTaskId = task->taskId - 1; taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } param = (SSchCallbackParam *)calloc(1, sizeof(*param)); - param->queryId = schtQueryId; - - pQueryJob = job; - + param->refId = queryJobRefId; + param->queryId = pJob->queryId; pIter = taosHashIterate(execTasks, NULL); while (pIter) { @@ -412,8 +426,9 @@ void* schtRunJobThread(void *aa) { param = (SSchCallbackParam *)calloc(1, sizeof(*param)); - param->queryId = schtQueryId; - + param->refId = queryJobRefId; + param->queryId = pJob->queryId; + pIter = taosHashIterate(execTasks, NULL); while (pIter) { SSchTask *task = (SSchTask *)pIter; @@ -431,7 +446,8 @@ void* schtRunJobThread(void *aa) { param = (SSchCallbackParam *)calloc(1, sizeof(*param)); - param->queryId = schtQueryId; + param->refId = queryJobRefId; + param->queryId = pJob->queryId; pIter = taosHashIterate(execTasks, NULL); while (pIter) { @@ -450,7 +466,8 @@ void* schtRunJobThread(void *aa) { param = (SSchCallbackParam *)calloc(1, sizeof(*param)); - param->queryId = schtQueryId; + param->refId = queryJobRefId; + param->queryId = pJob->queryId; pIter = taosHashIterate(execTasks, NULL); while (pIter) { @@ -470,7 +487,7 @@ void* schtRunJobThread(void *aa) { atomic_store_32(&schtStartFetch, 1); void *data = NULL; - code = schedulerFetchRows(pQueryJob, &data); + code = schedulerFetchRows(queryJobRefId, &data); assert(code == 0 || code); if (0 == code) { @@ -480,12 +497,13 @@ void* schtRunJobThread(void *aa) { } data = NULL; - code = schedulerFetchRows(pQueryJob, &data); + code = schedulerFetchRows(queryJobRefId, &data); assert(code == 0 || code); schtFreeQueryJob(0); taosHashCleanup(execTasks); + taosArrayDestroy(qnodeList); schtFreeQueryDag(&dag); @@ -516,7 +534,7 @@ TEST(queryTest, normalCase) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SSchJob *pJob = NULL; + int64_t job = 0; SQueryDag dag = {0}; schtInitLogFile(); @@ -537,59 +555,61 @@ TEST(queryTest, normalCase) { schtSetExecNode(); schtSetAsyncSendMsgToServer(); - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &pJob); + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); ASSERT_EQ(code, 0); - SSchJob *job = (SSchJob *)pJob; - void *pIter = taosHashIterate(job->execTasks, NULL); + + SSchJob *pJob = schAcquireJob(job); + + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } - pIter = taosHashIterate(job->execTasks, NULL); + pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); printf("code:%d", code); ASSERT_EQ(code, 0); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } - pIter = taosHashIterate(job->execTasks, NULL); + pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } - pIter = taosHashIterate(job->execTasks, NULL); + pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(job->execTasks, pIter); + pIter = taosHashIterate(pJob->execTasks, pIter); } pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_t thread1; - pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job); + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); void *data = NULL; code = schedulerFetchRows(job, &data); @@ -603,9 +623,11 @@ TEST(queryTest, normalCase) { data = NULL; code = schedulerFetchRows(job, &data); ASSERT_EQ(code, 0); - ASSERT_TRUE(data); + ASSERT_TRUE(data == NULL); - schedulerFreeJob(pJob); + schReleaseJob(job); + + schedulerFreeJob(job); schtFreeQueryDag(&dag); @@ -644,14 +666,14 @@ TEST(insertTest, normalCase) { pthread_attr_init(&thattr); pthread_t thread1; - pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); + pthread_create(&(thread1), &thattr, schtSendRsp, &insertJobRefId); SQueryResult res = {0}; - code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, "insert into tb values(now,1)", &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20); - schedulerFreeJob(pInsertJob); + schedulerFreeJob(insertJobRefId); schedulerDestroy(); } @@ -684,4 +706,4 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -#pragma GCC diagnostic pop \ No newline at end of file +#pragma GCC diagnostic pop