From 08541c0f22e1d1c48564986f413792eab5094fc9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 7 Mar 2022 19:37:17 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 17 ++++ include/common/tmsgdef.h | 1 + include/libs/catalog/catalog.h | 2 +- source/common/src/tmsg.c | 64 ++++++++++++ source/dnode/mnode/impl/src/mndQnode.c | 64 ++++++++++++ source/dnode/qnode/inc/qndInt.h | 4 +- source/dnode/qnode/src/qnode.c | 65 +++++++++++- source/libs/catalog/src/catalog.c | 40 ++++++++ source/libs/qcom/src/querymsg.c | 50 ++++++++++ source/libs/qworker/inc/qworkerInt.h | 4 + source/libs/qworker/src/qworker.c | 133 +++++++++++++++++++++++-- source/libs/qworker/src/qworkerMsg.c | 2 +- 12 files changed, 432 insertions(+), 14 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c3940f640e..87da671fc8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -592,6 +592,23 @@ int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp); +typedef struct { + int32_t rowNum; +} SQnodeListReq; + +int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); +int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); + +typedef struct { + SArray *epSetList; // SArray +} SQnodeListRsp; + +int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); +int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); +void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); + + + typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cae186ba16..08b5b36e3d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -129,6 +129,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 8db7d34d3e..f217277b80 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -51,7 +51,7 @@ typedef struct SMetaData { SArray *pTableMeta; // STableMeta array SArray *pVgroupInfo; // SVgroupInfo list SArray *pUdfList; // udf info list - SEpSet *pEpSet; // qnode epset list + SArray *pEpSetList; // qnode epset list, SArray } SMetaData; typedef struct SCatalogCfg { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8b4f572fee..c9c4b06be3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1446,6 +1446,70 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { return 0; } + +int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->rowNum) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->rowNum) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + int32_t num = taosArrayGetSize(pRsp->epSetList); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + SEpSet *epSet = taosArrayGet(pRsp->epSetList, i); + if (tEncodeSEpSet(&encoder, epSet) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + pRsp->epSetList = taosArrayInit(num, sizeof(SEpSet)); + if (NULL == pRsp->epSetList) return -1; + for (int32_t i = 0; i < num; ++i) { + if (tDecodeSEpSet(&decoder, TARRAY_GET_ELEM(pRsp->epSetList, i)) < 0) return -1; + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->epSetList); } + int32_t tSerializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 0c227b0db9..0d8dda8034 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -48,6 +48,7 @@ int32_t mndInitQnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq); + mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp); @@ -430,6 +431,69 @@ DROP_QNODE_OVER: return code; } + +static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) { + int32_t code = -1; + SQnodeListReq qlistReq = {0}; + int32_t numOfRows = 0; + SMnode *pMnode = pReq->pMnode; + SSdb *pSdb = pMnode->pSdb; + SQnodeObj *pObj = NULL; + SQnodeListRsp qlistRsp = {0}; + + if (tDeserializeSQnodeListReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &qlistReq) != 0) { + mError("invalid qnode list msg"); + terrno = TSDB_CODE_INVALID_MSG; + goto QNODE_LIST_OVER; + } + + qlistRsp->epSetList = taosArrayInit(5, sizeof(SEpSet)); + if (NULL == qlistRsp->epSetList) { + mError("taosArrayInit epSet failed"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto QNODE_LIST_OVER; + } + + while (true) { + void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj); + if (pIter == NULL) break; + + SEpSet epSet = {0}; + strcpy(epSet.eps[0].fqdn, pObj->pDnode->fqdn); + epSet.eps[0].port = pObj->pDnode->port; + epSet.numOfEps = 1; + + taosArrayPush(qlistRsp->epSetList, &epSet); + + numOfRows++; + sdbRelease(pSdb, pObj); + + if (qlistReq->rowNum > 0 && numOfRows >= qlistReq->rowNum) { + break; + } + } + + int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp); + void *pRsp = malloc(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto QNODE_LIST_OVER; + } + + tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); + + pReq->contLen = rspLen; + pReq->pCont = pRsp; + code = 0; + +QNODE_LIST_OVER: + + tFreeSQnodeListRsp(&qlistRsp); + + return code; +} + + static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 0f8e65de7e..8d986434d2 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,11 +29,13 @@ extern "C" { #endif typedef struct SQnode { + int32_t qndId; SQnodeOpt opt; + SQHandle* pQuery; } SQnode; #ifdef __cplusplus } #endif -#endif /*_TD_QNODE_INT_H_*/ \ No newline at end of file +#endif /*_TD_QNODE_INT_H_*/ diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 8d2a72b268..21e3d2084b 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -17,10 +17,25 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = calloc(1, sizeof(SQnode)); + if (NULL == pQnode) { + qError("calloc SQnode failed"); + return NULL; + } + + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode, + (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode)) { + tfree(pQnode); + return NULL; + } + return pQnode; } -void qndClose(SQnode *pQnode) { free(pQnode); } +void qndClose(SQnode *pQnode) { + qWorkerDestroy(&pQnode->pQuery); + + free(pQnode); +} int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } @@ -29,3 +44,51 @@ int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return 0; } +int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { + qTrace("message in query queue is processing"); + SReadHandle handle = {0}; + + switch (pMsg->msgType) { + case TDMT_VND_QUERY:{ + return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); + } + case TDMT_VND_QUERY_CONTINUE: + return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in query queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + +int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { + qTrace("message in fetch queue is processing"); + switch (pMsg->msgType) { + case TDMT_VND_FETCH: + return qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_FETCH_RSP: + return qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_RES_READY: + return qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_TASKS_STATUS: + return qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_CANCEL_TASK: + return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_DROP_TASK: + return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES: + return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg); + case TDMT_VND_SHOW_TABLES_FETCH: + //return vnodeGetTableList(pQnode, pMsg); + case TDMT_VND_TABLE_META: + //return vnodeGetTableMeta(pQnode, pMsg); + case TDMT_VND_CONSUME: + //return tqProcessConsumeReq(pQnode->pTq, pMsg); + default: + vError("unknown msg type:%d in fetch queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + + + + diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6127c587ec..6f903c3e03 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -558,6 +558,42 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac return TSDB_CODE_SUCCESS; } +int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)](NULL, &msg, 0, &msgLen); + if (code) { + ctgError("Build qnode list msg failed, error:%s", tstrerror(code)); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_QNODE_LIST, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for qnode list, error:%s, db:%s", tstrerror(rpcRsp.code)); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process qnode list rsp failed, error:%s", tstrerror(rpcRsp.code)); + CTG_ERR_RET(code); + } + + ctgDebug("Got qnode list from mnode, listNum:%d", taosArrayGetSize(*out)); + + return TSDB_CODE_SUCCESS; +} int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { @@ -2602,6 +2638,10 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, } } + if (pReq->qNodeRequired) { + CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList)); + } + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index ab0bbd319a..37e8b7302e 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -97,6 +97,25 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms return TSDB_CODE_SUCCESS; } +int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SQnodeListReq qnodeListReq = {0}; + qnodeListReq.rowNum = -1; + + int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -256,14 +275,45 @@ PROCESS_META_OVER: return code; } + +int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) { + SQnodeListRsp out = {0}; + int32_t code = -1; + + if (NULL == output || NULL == msg || msgSize <= 0) { + code = TSDB_CODE_TSC_INVALID_INPUT; + goto PROCESS_QLIST_OVER; + } + + if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) { + qError("invalid qnode list rsp msg, msgSize:%d", msgSize); + code = TSDB_CODE_INVALID_MSG; + goto PROCESS_QLIST_OVER; + } + +PROCESS_QLIST_OVER: + + if (code != 0) { + tFreeSQnodeListRsp(&out); + out.epSetList = NULL; + } + + *(SArray **)output = out.epSetList; + + return code; +} + + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; } #pragma GCC diagnostic pop diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index de2940846c..d061f51407 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -26,6 +26,8 @@ extern "C" { #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 +#define QW_DEFAULT_HEARTBEAT_MSEC 3000 + enum { QW_PHASE_PRE_QUERY = 1, QW_PHASE_POST_QUERY, @@ -131,6 +133,8 @@ typedef struct SQWorkerMgmt { SQWorkerCfg cfg; int8_t nodeType; int32_t nodeId; + void *timer; + tmr_h hbTimer; SRWLatch schLock; //SRWLatch ctxLock; SHashObj *schHash; //key: schedulerId, value: SQWSchStatus diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7a6fa4cfd7..3208c9f646 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -521,6 +521,47 @@ _return: QW_RET(code); } +int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStatusRsp **rsp) { + int32_t taskNum = 0; + + QW_LOCK(QW_READ, &sch->tasksLock); + + taskNum = taosHashGetSize(sch->tasksHash); + + int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; + *rsp = calloc(1, size); + if (NULL == *rsp) { + QW_SCH_ELOG("calloc %d failed", size); + QW_UNLOCK(QW_READ, &sch->tasksLock); + + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + void *key = NULL; + size_t keyLen = 0; + int32_t i = 0; + + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter; + taosHashGetKey(pIter, &key, &keyLen); + + //TODO GET EXECUTOR API TO GET MORE INFO + + QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); + (*rsp)->status[i].status = taskStatus->status; + + ++i; + pIter = taosHashIterate(sch->tasksHash, pIter); + } + + QW_UNLOCK(QW_READ, &sch->tasksLock); + + (*rsp)->num = taskNum; + + return TSDB_CODE_SUCCESS; +} + int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t len = 0; @@ -1312,6 +1353,58 @@ _return: QW_RET(code); } +void qwProcessHbTimer(void *param, void *tmrId) { + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param; + SQWSchStatus *sch = NULL; + int32_t taskNum = 0; + SSchedulerStatusRsp **rspList = NULL; + int32_t code = 0; + + QW_LOCK(QW_READ, &mgmt->schLock); + + int32_t schNum = taosHashGetSize(mgmt->schHash); + if (schNum <= 0) { + QW_UNLOCK(QW_READ, &mgmt->schLock); + return TSDB_CODE_SUCCESS; + } + + rspList = calloc(schNum, POINTER_BYTES); + if (NULL == rspList) { + QW_UNLOCK(QW_READ, &mgmt->schLock); + QW_ELOG("calloc %d rsp pointer failed", schNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + void *key = NULL; + size_t keyLen = 0; + int32_t i = 0; + + void *pIter = taosHashIterate(mgmt->schHash, NULL); + while (pIter) { + code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]); + if (code) { + taosHashCancelIterate(mgmt->schHash, pIter); + QW_ERR_JRET(code); + } + + ++i; + pIter = taosHashIterate(mgmt->schHash, pIter); + } + +_return: + + QW_UNLOCK(QW_READ, &mgmt->schLock); + + for (int32_t j = 0; j < i; ++j) { + qwBuildAndSendStatusRsp(NULL, rspList[j]); //TODO SCHEDULER CONNECTION + tfree(rspList[j]); + } + + tfree(rspList); + + QW_RET(code); +} + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp1, sendReqToDnodeFp fp2) { if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) { @@ -1319,6 +1412,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); @@ -1346,16 +1440,25 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW if (NULL == mgmt->schHash) { tfree(mgmt); qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == mgmt->ctxHash) { qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum); - taosHashCleanup(mgmt->schHash); - mgmt->schHash = NULL; - tfree(mgmt); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + mgmt->timer = taosTmrInit(0, 0, 0, "qworker"); + if (NULL == mgmt->timer) { + qError("init timer failed, error:%s", tstrerror(terrno)); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + mgmt->hbTimer = taosTmrStart(qwProcessHbTimer, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer); + if (NULL == mgmt->hbTimer) { + qError("start hb timer failed"); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } mgmt->nodeType = nodeType; @@ -1369,6 +1472,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); return TSDB_CODE_SUCCESS; + +_return: + + taosHashCleanup(mgmt->schHash); + taosHashCleanup(mgmt->ctxHash); + + taosTmrCleanUp(mgmt->timer); + + tfree(mgmt); + + QW_RET(code); } void qWorkerDestroy(void **qWorkerMgmt) { @@ -1385,12 +1499,11 @@ void qWorkerDestroy(void **qWorkerMgmt) { tfree(*qWorkerMgmt); } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) { +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { SQWSchStatus *sch = NULL; int32_t taskNum = 0; -/* - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); + QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch)); sch->lastAccessTs = taosGetTimestampSec(); @@ -1401,7 +1514,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; *rsp = calloc(1, size); if (NULL == *rsp) { - qError("calloc %d failed", size); + QW_SCH_ELOG("calloc %d failed", size); QW_UNLOCK(QW_READ, &sch->tasksLock); qwReleaseScheduler(QW_READ, mgmt); @@ -1420,6 +1533,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); (*rsp)->status[i].status = taskStatus->status; + ++i; pIter = taosHashIterate(sch->tasksHash, pIter); } @@ -1427,7 +1541,6 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint qwReleaseScheduler(QW_READ, mgmt); (*rsp)->num = taskNum; -*/ return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index bdb9617d0d..06b8310d45 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -392,7 +392,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SSchedulerStatusRsp *sStatus = NULL; - //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); + QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: