From e95647970e89473952970c1217dc00d7007f77bf Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 23 May 2022 22:35:45 +0800 Subject: [PATCH 1/6] scheduler async api --- include/libs/scheduler/scheduler.h | 3 ++ source/libs/scheduler/inc/schedulerInt.h | 20 ++++++---- source/libs/scheduler/src/schJob.c | 48 ++++++++++++++++++----- source/libs/scheduler/src/schRemote.c | 42 ++++++++++---------- source/libs/scheduler/src/scheduler.c | 50 ++++++------------------ 5 files changed, 88 insertions(+), 75 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index dcd058a293..62a97d1be0 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -64,6 +64,9 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; +typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code); + + int32_t schedulerInit(SSchedulerCfg *cfg); /** diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index ffac0f856d..d2ddd8b695 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -40,8 +40,8 @@ enum { }; typedef struct SSchTrans { - void *transInst; - void *transHandle; + void *pTrans; + void *pHandle; } SSchTrans; typedef struct SSchHbTrans { @@ -74,12 +74,17 @@ typedef struct SSchJobStat { } SSchJobStat; -typedef struct SSchedulerStat { +typedef struct SSchStat { SSchApiStat api; SSchRuntimeStat runtime; SSchJobStat job; -} SSchedulerStat; +} SSchStat; +typedef struct SSchResInfo { + SQueryResult queryRes; + schedulerCallback userFp; + void* userParam; +} SSchResInfo; typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId @@ -89,7 +94,7 @@ typedef struct SSchedulerMgmt { bool exit; int32_t jobRef; int32_t jobNum; - SSchedulerStat stat; + SSchStat stat; SHashObj *hbConnections; } SSchedulerMgmt; @@ -170,7 +175,7 @@ typedef struct SSchJob { SSchJobAttr attr; int32_t levelNum; int32_t taskNum; - void *transport; + void *pTrans; SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler @@ -196,6 +201,7 @@ typedef struct SSchJob { void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; + SSchResInfo userRes; const char *sql; SQueryProfileSummary summary; } SSchJob; @@ -292,7 +298,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, +int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, bool syncSchedule); int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, int64_t startTs, bool sync); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 14f4646397..a211d90a37 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -39,8 +39,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { +int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pNodeList, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -51,8 +51,9 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; - pJob->transport = transport; + pJob->pTrans = pTrans; pJob->sql = sql; + pJob->userRes = pRes; if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); @@ -1228,8 +1229,8 @@ void schFreeJobImpl(void *job) { schCloseJobRef(); } -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync) { +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -1238,7 +1239,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, sync)); + SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); SCH_ERR_JRET(schLaunchJob(pJob)); @@ -1261,8 +1262,36 @@ _return: SCH_RET(code); } -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule) { +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes, bool sync) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync)); + } else { + SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync)); + } + +_return: + + if (*pJob) { + SSchJob *job = schAcquireJob(*pJob); + + pRes->code = atomic_load_32(&job->errCode); + pRes->numOfRows = job->resNumOfRows; + pRes->res = job->queryRes; + job->queryRes = NULL; + + schReleaseJob(*pJob); + } + + return code; +} + +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); int32_t code = 0; @@ -1277,7 +1306,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - + pJob->userRes = pRes; + SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); int64_t refId = taosAddRef(schMgmt.jobRef, pJob); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 6d9f6b435f..247358af8f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -481,7 +481,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; msgSendInfo->param = param; msgSendInfo->fp = fp; @@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->nodeEpId.nodeId = addr->nodeId; memcpy(¶m->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - param->transport = pJob->transport; + param->transport = pJob->pTrans; *pParam = param; @@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); param->nodeEpId = epId; - param->transport = pJob->transport; + param->transport = pJob->pTrans; pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; @@ -666,7 +666,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * int32_t code = 0; SSchHbTrans hb = {0}; - hb.trans.transInst = pJob->transport; + hb.trans.pTrans = pJob->pTrans; SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx)); @@ -743,12 +743,12 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); - param->transport = trans.transInst; + param->transport = trans.pTrans; pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans.transHandle; + pMsgSendInfo->msgInfo.handle = trans.pHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; @@ -756,13 +756,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { SEpSet epSet = {.inUse = 0, .numOfEps = 1}; memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); - qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, + qDebug("start to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d", trans.pTrans, trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); - code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); + code = asyncSendMsgToServerExt(trans.pTrans, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { - qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, - trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); + qError("fail to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s", trans.pTrans, + trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); SCH_ERR_JRET(code); } @@ -812,8 +812,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { memcpy(&hb->trans, trans, sizeof(*trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId, - epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle); + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, + epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); return TSDB_CODE_SUCCESS; } @@ -833,8 +833,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { } SSchTrans trans = {0}; - trans.transInst = pParam->transport; - trans.transHandle = pMsg->handle; + trans.pTrans = pParam->transport; + trans.pHandle = pMsg->handle; SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); @@ -879,7 +879,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; *pParam = param; @@ -1034,15 +1034,15 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans->transHandle; + pMsgSendInfo->msgInfo.handle = trans->pHandle; pMsgSendInfo->msgType = msgType; - qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType), + qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "pTrans:%p, pHandle:%p", TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId, - trans->transInst, trans->transHandle); + trans->pTrans, trans->pHandle); int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1208,12 +1208,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; + SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); if (msgType == TDMT_VND_QUERY) { - SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle)); + SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index bd2c7e5b49..de802465c2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -67,50 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, +int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; - - *pJob = 0; - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); - } else { - SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); - } - -_return: - - if (*pJob) { - SSchJob *job = schAcquireJob(*pJob); - - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - pRes->res = job->queryRes; - job->queryRes = NULL; - - schReleaseJob(*pJob); - } - - return code; + SSchResInfo resInfo = {.queryRes = *pRes}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true)); } -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); - } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); - } - - return TSDB_CODE_SUCCESS; +int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false)); } int32_t schedulerFetchRows(int64_t job, void **pData) { From efa071c9f68eff3e1d068b1b1dbc271a9139a237 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Tue, 24 May 2022 17:49:22 +0800 Subject: [PATCH 2/6] scheduler async api --- include/libs/scheduler/scheduler.h | 10 +- source/client/src/clientImpl.c | 2 +- source/libs/scheduler/inc/schedulerInt.h | 31 ++- source/libs/scheduler/src/schJob.c | 233 ++++++++++++++++-- source/libs/scheduler/src/schRemote.c | 8 +- source/libs/scheduler/src/scheduler.c | 102 +++----- source/libs/scheduler/test/schedulerTests.cpp | 60 ++++- 7 files changed, 324 insertions(+), 122 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 62a97d1be0..a72489f338 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -54,8 +54,6 @@ typedef struct SQueryProfileSummary { typedef struct SQueryResult { int32_t code; uint64_t numOfRows; - int32_t msgSize; - char *msg; void *res; } SQueryResult; @@ -64,7 +62,8 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; -typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); int32_t schedulerInit(SSchedulerCfg *cfg); @@ -83,7 +82,8 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, int64_t *pJob); + int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, schedulerExecCallback fp, void* param); /** * Fetch query result from the remote query executor @@ -93,6 +93,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD */ int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); + int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b142885841..131a3d5770 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -292,7 +292,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + SQueryResult res = {.code = 0, .numOfRows = 0}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d2ddd8b695..6599d00f58 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,6 +39,11 @@ enum { SCH_WRITE, }; +enum { + SCH_EXEC_CB = 1, + SCH_FETCH_CB, +}; + typedef struct SSchTrans { void *pTrans; void *pHandle; @@ -81,9 +86,11 @@ typedef struct SSchStat { } SSchStat; typedef struct SSchResInfo { - SQueryResult queryRes; - schedulerCallback userFp; - void* userParam; + SQueryResult* queryRes; + void** fetchRes; + schedulerExecCallback execFp; + schedulerFetchCallback fetchFp; + void* userParam; } SSchResInfo; typedef struct SSchedulerMgmt { @@ -113,7 +120,7 @@ typedef struct SSchTaskCallbackParam { typedef struct SSchHbCallbackParam { SSchCallbackParamHeader head; SQueryNodeEpId nodeEpId; - void *transport; + void *pTrans; } SSchHbCallbackParam; typedef struct SSchFlowControl { @@ -196,13 +203,13 @@ typedef struct SSchJob { int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - SArray *errList; // SArray SRWLatch resLock; void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; SSchResInfo userRes; const char *sql; + int32_t userCb; SQueryProfileSummary summary; } SSchJob; @@ -298,15 +305,21 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); -int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule); -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync); +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync); +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync); int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus); int32_t schCancelJob(SSchJob *pJob); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); uint64_t schGenTaskId(void); void schCloseJobRef(void); +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schFetchRows(SSchJob *pJob); +int32_t schAsyncFetchRows(SSchJob *pJob); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index a211d90a37..52722d819f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -53,7 +53,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN pJob->attr.syncSchedule = syncSchedule; pJob->pTrans = pTrans; pJob->sql = sql; - pJob->userRes = pRes; + pJob->userRes = *pRes; if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); @@ -459,6 +459,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: + if (planToTask) { taosHashCleanup(planToTask); } @@ -728,6 +729,69 @@ _return: SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); } + +int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { + pRes->code = atomic_load_32(&pJob->errCode); + pRes->numOfRows = pJob->resNumOfRows; + pRes->res = pJob->queryRes; + pJob->queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) { + int32_t code = 0; + if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { + SCH_ERR_RET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); + } + + while (true) { + *pData = atomic_load_ptr(&pJob->resData); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { + continue; + } + + break; + } + + if (NULL == *pData) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); + if (rsp) { + rsp->completed = 1; + } + + *pData = rsp; + SCH_JOB_DLOG("empty res and set query complete, code:%x", code); + } + + SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserQueryRes(SSchJob* pJob) { + pJob->userRes.queryRes = taosMemoryCalloc(1, sizeof(*pJob->userRes.queryRes)); + if (pJob->userRes.queryRes) { + schSetJobQueryRes(pJob, pJob->userRes.queryRes); + } + + (*pJob->userRes.execFp)(pJob->userRes.queryRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + pJob->userRes.queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserFetchRes(SSchJob* pJob) { + void* pRes = NULL; + + SCH_ERR_RET(schSetJobFetchRes(pJob, &pRes)); + + (*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + return TSDB_CODE_SUCCESS; +} + int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schChkUpdateJobStatus(pJob, status)); @@ -742,6 +806,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + if (!pJob->attr.syncSchedule) { + if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); + } + } + SCH_RET(code); } @@ -763,6 +835,10 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { if (pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); + } else if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); } if (atomic_load_8(&pJob->userFetch)) { @@ -1219,6 +1295,7 @@ void schFreeJobImpl(void *job) { tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); } + taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -1239,57 +1316,66 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); - - SCH_ERR_JRET(schLaunchJob(pJob)); + SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); *job = pJob->refId; + SCH_ERR_JRET(schLaunchJob(pJob)); + if (sync) { SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); + } else { + pJob->userCb = SCH_EXEC_CB; } SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - schReleaseJob(pJob->refId); - - return TSDB_CODE_SUCCESS; - _return: - schFreeJobImpl(pJob); + schReleaseJob(pJob->refId); + SCH_RET(code); } int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SSchResInfo *pRes, bool sync) { + int64_t startTs, SSchResInfo *pRes) { int32_t code = 0; *pJob = 0; if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync)); + SCH_ERR_JRET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, NULL, true)); } else { - SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync)); + SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, NULL, startTs, true)); } _return: if (*pJob) { SSchJob *job = schAcquireJob(*pJob); - - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - pRes->res = job->queryRes; - job->queryRes = NULL; - + schSetJobQueryRes(job, pRes->queryRes); schReleaseJob(*pJob); } return code; } +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, false)); + } else { + SCH_ERR_RET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, false)); + } + + return code; +} + int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, SSchResInfo *pRes, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); @@ -1303,10 +1389,11 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa pJob->sql = sql; pJob->attr.queryJob = true; + pJob->attr.syncSchedule = sync; pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - pJob->userRes = pRes; + pJob->userRes = *pRes; SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); @@ -1318,7 +1405,7 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa if (NULL == schAcquireJob(refId)) { SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } pJob->refId = refId; @@ -1326,12 +1413,17 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + *job = pJob->refId; SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + if (!pJob->attr.syncSchedule) { + code = schNotifyUserQueryRes(pJob); + } + schReleaseJob(pJob->refId); - return TSDB_CODE_SUCCESS; + SCH_RET(code); _return: @@ -1339,4 +1431,103 @@ _return: SCH_RET(code); } +int32_t schFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); + tsem_wait(&pJob->rspSem); + + status = SCH_GET_JOB_STATUS(pJob); + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } + } + + SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes)); + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + +int32_t schAsyncFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) { + SCH_ERR_JRET(schNotifyUserFetchRes(pJob)); + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + } else { + pJob->userCb = SCH_FETCH_CB; + + SCH_ERR_JRET(schFetchFromRemote(pJob)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 247358af8f..33c04318cf 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -450,7 +450,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c if (head->isHbParam) { SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; - SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL}; + SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId)); @@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->nodeEpId.nodeId = addr->nodeId; memcpy(¶m->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; *pParam = param; @@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); param->nodeEpId = epId; - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; @@ -1208,7 +1208,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; + SSchTrans trans = {.pTrans = pJob->pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index de802465c2..8d802980ea 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -73,18 +73,18 @@ int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int6 SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchResInfo resInfo = {.queryRes = *pRes}; - SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true)); + SSchResInfo resInfo = {.queryRes = pRes}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) { - if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) { + int64_t startTs, schedulerExecCallback fp, void* param) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp || NULL == param) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param}; - SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false)); + SSchResInfo resInfo = {.execFp = fp, .userParam = param}; + SCH_RET(schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } int32_t schedulerFetchRows(int64_t job, void **pData) { @@ -99,76 +99,32 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_DROPPING) { - SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); - schReleaseJob(job); + pJob->attr.syncSchedule = true; + pJob->userRes.fetchRes = pData; + code = schFetchRows(pJob); + + schReleaseJob(job); + + SCH_RET(code); +} + +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { + if (NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + SSchJob *pJob = schAcquireJob(job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (!SCH_JOB_NEED_FETCH(pJob)) { - SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); - goto _return; - } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { - SCH_ERR_JRET(schFetchFromRemote(pJob)); - tsem_wait(&pJob->rspSem); - } - } else { - SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - status = SCH_GET_JOB_STATUS(pJob); - - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } - - if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_JRET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); - } - - while (true) { - *pData = atomic_load_ptr(&pJob->resData); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { - continue; - } - - break; - } - - if (NULL == *pData) { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - if (rsp) { - rsp->completed = 1; - } - - *pData = rsp; - SCH_JOB_DLOG("empty res and set query complete, code:%x", code); - } - - SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code)); - -_return: - - atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + pJob->attr.syncSchedule = false; + pJob->userRes.fetchFp = fp; + pJob->userRes.userParam = param; + + code = schFetchRows(pJob); schReleaseJob(job); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index fc0e05aaf1..ec5d74372d 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -87,6 +87,11 @@ void schtInitLogFile() { } +void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) { + assert(TSDB_CODE_SUCCESS == code); + *(int32_t*)param = 1; +} + void schtBuildQueryDag(SQueryPlan *dag) { uint64_t qId = schtQueryId; @@ -485,6 +490,7 @@ void* schtRunJobThread(void *aa) { SHashObj *execTasks = NULL; SDataBuf dataBuf = {0}; uint32_t jobFinished = 0; + int32_t queryDone = 0; while (!schtTestStop) { schtBuildQueryDag(&dag); @@ -496,7 +502,8 @@ void* schtRunJobThread(void *aa) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId); + queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &queryJobRefId, "select * from tb", 0, schtQueryCb, &queryDone); assert(code == 0); pJob = schAcquireJob(queryJobRefId); @@ -595,6 +602,14 @@ void* schtRunJobThread(void *aa) { pIter = taosHashIterate(execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + atomic_store_32(&schtStartFetch, 1); void *data = NULL; @@ -667,8 +682,9 @@ TEST(queryTest, normalCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -718,6 +734,14 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + TdThreadAttr thattr; taosThreadAttrInit(&thattr); @@ -773,8 +797,9 @@ TEST(queryTest, readyFirstCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -824,6 +849,13 @@ TEST(queryTest, readyFirstCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; @@ -885,16 +917,17 @@ TEST(queryTest, flowCtrlCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); SSchJob *pJob = schAcquireJob(job); - bool queryDone = false; + bool qDone = false; - while (!queryDone) { + while (!qDone) { void *pIter = taosHashIterate(pJob->execTasks, NULL); if (NULL == pIter) { break; @@ -915,7 +948,7 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); } else { - queryDone = true; + qDone = true; break; } @@ -923,6 +956,13 @@ TEST(queryTest, flowCtrlCase) { } } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; taosThreadAttrInit(&thattr); From 04238b8ff15464d625b7ac77cf74b68b4c85d393 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Tue, 24 May 2022 19:22:06 +0800 Subject: [PATCH 3/6] fix crash issue --- source/libs/scheduler/src/schJob.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 52722d819f..d64c944994 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -53,8 +53,10 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN pJob->attr.syncSchedule = syncSchedule; pJob->pTrans = pTrans; pJob->sql = sql; - pJob->userRes = *pRes; - + if (pRes) { + pJob->userRes = *pRes; + } + if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); } @@ -1393,7 +1395,9 @@ int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDa pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - pJob->userRes = *pRes; + if (pRes) { + pJob->userRes = *pRes; + } SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); From 54b7e1b29424dbac72d192e2a66bc192584d558f Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Tue, 24 May 2022 21:34:18 +0800 Subject: [PATCH 4/6] fix hb epset issue --- source/client/src/clientHb.c | 6 ++++-- source/dnode/mnode/impl/src/mndProfile.c | 2 +- source/libs/parser/src/parInsertData.c | 2 +- source/libs/scheduler/src/schRemote.c | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d01ec501ba..193d64bf6c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -140,8 +140,10 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); if (NULL == pTscObj) { tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); - } else { - updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); + } else { + if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); + } pTscObj->connId = pRsp->query->connId; if (pRsp->query->killRid) { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index b9ac82d890..c9c52af0fe 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -379,7 +379,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } rspBasic->connId = pConn->id; - rspBasic->totalDnodes = 1; // TODO + rspBasic->totalDnodes = mndGetDnodeSize(pMnode); rspBasic->onlineDnodes = 1; // TODO mndGetMnodeEpSet(pMnode, &rspBasic->epSet); mndReleaseConn(pMnode, pConn); diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index f82c792c96..6a42109d55 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -137,7 +137,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star } memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); - dataBuf->pTableMeta = tableMetaDup(pTableMeta); + dataBuf->pTableMeta = pTableMeta; SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo; SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 33c04318cf..8704f0e339 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -396,6 +396,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: + if (pJob) { schReleaseJob(pParam->refId); } From 52e05bbfe60c53a472dce83441fa06b580b1e6c4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 25 May 2022 09:25:39 +0800 Subject: [PATCH 5/6] fix mem leak --- source/libs/catalog/src/catalog.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 4afebf9951..bbb8983713 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -540,12 +540,6 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == metaCache) { - qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); if (code) { if (HASH_NODE_EXIST(code)) { From 6c6237abe5bb6944a4c0c207c4730b27551edb3e Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 25 May 2022 13:27:36 +0800 Subject: [PATCH 6/6] fix mem double free issue --- source/libs/parser/src/parInsert.c | 7 ++----- source/libs/parser/src/parInsertData.c | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 11324e3f49..76dddcf192 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1073,7 +1073,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t tbNum = 0; char tbFName[TSDB_TABLE_FNAME_LEN]; bool autoCreateTbl = false; - STableMeta* pMeta = NULL; // for each table while (1) { @@ -1136,12 +1135,10 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); - pMeta = pCxt->pTableMeta; - pCxt->pTableMeta = NULL; if (TK_NK_LP == sToken.type) { // pSql -> field1_name, ...) - CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta))); + CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -1177,7 +1174,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 6a42109d55..f82c792c96 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -137,7 +137,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star } memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); - dataBuf->pTableMeta = pTableMeta; + dataBuf->pTableMeta = tableMetaDup(pTableMeta); SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo; SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);