diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 87da671fc8..1a7908ab18 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1048,6 +1048,7 @@ typedef struct SSubQueryMsg { uint64_t sId; uint64_t queryId; uint64_t taskId; + int64_t refId; int8_t taskType; uint32_t sqlLen; // the query sql, uint32_t phyLen; @@ -1094,19 +1095,57 @@ typedef struct { typedef struct { uint64_t queryId; uint64_t taskId; + int64_t refId; int8_t status; } STaskStatus; typedef struct { - uint32_t num; - STaskStatus status[]; + int64_t refId; + SArray *taskStatus; //SArray } SSchedulerStatusRsp; +typedef struct { + uint64_t queryId; + uint64_t taskId; + int8_t action; +} STaskAction; + + +typedef struct SQueryNodeEpId { + int32_t nodeId; // vgId or qnodeId + SEp ep; +} SQueryNodeEpId; + + +typedef struct { + SMsgHead header; + uint64_t sId; + SQueryNodeEpId epId; + SArray *taskAction; //SArray +} SSchedulerHbReq; + +int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); +int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); +void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq); + + +typedef struct { + uint64_t seqId; + SQueryNodeEpId epId; + SArray *taskStatus; //SArray +} SSchedulerHbRsp; + +int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); +int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); +void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp); + + typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; + int64_t refId; } STaskCancelReq; typedef struct { @@ -1118,6 +1157,7 @@ typedef struct { uint64_t sId; uint64_t queryId; uint64_t taskId; + int64_t refId; } STaskDropReq; typedef struct { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 08b5b36e3d..80f8384c8f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -181,6 +181,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5aa3ca1acf..4669f21486 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -133,6 +133,7 @@ typedef struct SQueryNodeAddr { SEpSet epset; } SQueryNodeAddr; + typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 5e3320ffdb..dd3103edf1 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -72,6 +72,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c9c4b06be3..f274620744 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2458,3 +2458,149 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) tEndDecode(decoder); return 0; } + +int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1; + if (tEncodeU16(&encoder, pReq->epId.ep.port) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1; + if (pReq->taskAction) { + int32_t num = taosArrayGetSize(pReq->taskAction); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + STaskAction *action = taosArrayGet(pReq->taskAction, i); + if (tEncodeU64(&encoder, action->queryId) < 0) return -1; + if (tEncodeU64(&encoder, action->taskId) < 0) return -1; + if (tEncodeI8(&encoder, action->action) < 0) return -1; + } + } else { + if (tEncodeI32(&encoder, 0) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epId.nodeId) < 0) return -1; + if (tDecodeU16(&decoder, &pReq->epId.ep.port) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->epId.ep.fqdn) < 0) return -1; + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num > 0) { + pReq->taskAction = taosArrayInit(num, sizeof(STaskStatus)); + if (NULL == pReq->taskAction) return -1; + for (int32_t i = 0; i < num; ++i) { + STaskAction action = {0}; + if (tDecodeU64(&decoder, &action.queryId) < 0) return -1; + if (tDecodeU64(&decoder, &action.taskId) < 0) return -1; + if (tDecodeI8(&decoder, &action.action) < 0) return -1; + taosArrayPush(pReq->taskAction, &action); + } + } else { + pReq->taskAction = NULL; + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq) { taosArrayDestroy(pReq->taskAction); } + + + +int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; + if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; + if (pRsp->taskStatus) { + int32_t num = taosArrayGetSize(pRsp->taskStatus); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + STaskStatus *status = taosArrayGet(pRsp->taskStatus, i); + if (tEncodeU64(&encoder, status->queryId) < 0) return -1; + if (tEncodeU64(&encoder, status->taskId) < 0) return -1; + if (tEncodeI64(&encoder, status->refId) < 0) return -1; + if (tEncodeI8(&encoder, status->status) < 0) return -1; + } + } else { + if (tEncodeI32(&encoder, 0) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeU64(&decoder, &pRsp->seqId) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1; + if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1; + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num > 0) { + pRsp->taskStatus = taosArrayInit(num, sizeof(STaskStatus)); + if (NULL == pRsp->taskStatus) return -1; + for (int32_t i = 0; i < num; ++i) { + STaskStatus status = {0}; + if (tDecodeU64(&decoder, &status.queryId) < 0) return -1; + if (tDecodeU64(&decoder, &status.taskId) < 0) return -1; + if (tDecodeI64(&decoder, &status.refId) < 0) return -1; + if (tDecodeI8(&decoder, &status.status) < 0) return -1; + taosArrayPush(pRsp->taskStatus, &status); + } + } else { + pRsp->taskStatus = NULL; + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); } + + + diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 157bad26a6..78bd71b919 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -152,6 +152,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 0d8dda8034..980e1a5a3f 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -36,6 +36,7 @@ static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp); static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); +static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq); int32_t mndInitQnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_QNODE, @@ -447,8 +448,8 @@ static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) { goto QNODE_LIST_OVER; } - qlistRsp->epSetList = taosArrayInit(5, sizeof(SEpSet)); - if (NULL == qlistRsp->epSetList) { + qlistRsp.epSetList = taosArrayInit(5, sizeof(SEpSet)); + if (NULL == qlistRsp.epSetList) { mError("taosArrayInit epSet failed"); terrno = TSDB_CODE_OUT_OF_MEMORY; goto QNODE_LIST_OVER; @@ -463,12 +464,12 @@ static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) { epSet.eps[0].port = pObj->pDnode->port; epSet.numOfEps = 1; - taosArrayPush(qlistRsp->epSetList, &epSet); + taosArrayPush(qlistRsp.epSetList, &epSet); numOfRows++; sdbRelease(pSdb, pObj); - if (qlistReq->rowNum > 0 && numOfRows >= qlistReq->rowNum) { + if (qlistReq.rowNum > 0 && numOfRows >= qlistReq.rowNum) { break; } } diff --git a/source/dnode/qnode/CMakeLists.txt b/source/dnode/qnode/CMakeLists.txt index 92cd8fcb5c..32e3e85d90 100644 --- a/source/dnode/qnode/CMakeLists.txt +++ b/source/dnode/qnode/CMakeLists.txt @@ -11,4 +11,7 @@ target_link_libraries( PRIVATE os PRIVATE common PRIVATE util + PRIVATE qworker + PRIVATE qcom + PRIVATE executor ) \ No newline at end of file diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 8d986434d2..357a62052a 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -28,6 +28,8 @@ extern "C" { #endif +typedef struct SQWorkerMgmt SQHandle; + typedef struct SQnode { int32_t qndId; SQnodeOpt opt; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 21e3d2084b..a1c3f5b0d4 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -14,6 +14,13 @@ */ #include "qndInt.h" +#include "query.h" +#include "qworker.h" +#include "executor.h" + +int32_t qnodePutReqToVQueryQ(SQnode* pQnode, struct SRpcMsg* pReq) {} +void qnodeSendReqToDnode(SQnode* pQnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {} + SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = calloc(1, sizeof(SQnode)); @@ -23,7 +30,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { } if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode, - (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode)) { + (putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqToDnodeFp)qnodeSendReqToDnode)) { tfree(pQnode); return NULL; } @@ -32,7 +39,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { } void qndClose(SQnode *pQnode) { - qWorkerDestroy(&pQnode->pQuery); + qWorkerDestroy((void **)&pQnode->pQuery); free(pQnode); } @@ -55,7 +62,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { case TDMT_VND_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); default: - vError("unknown msg type:%d in query queue", pMsg->msgType); + qError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; } } @@ -84,7 +91,7 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { case TDMT_VND_CONSUME: //return tqProcessConsumeReq(pQnode->pTq, pMsg); default: - vError("unknown msg type:%d in fetch queue", pMsg->msgType); + qError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; } } diff --git a/source/dnode/vnode/src/inc/vnodeQuery.h b/source/dnode/vnode/src/inc/vnodeQuery.h index 51c93b5ad7..7816b4eb46 100644 --- a/source/dnode/vnode/src/inc/vnodeQuery.h +++ b/source/dnode/vnode/src/inc/vnodeQuery.h @@ -26,6 +26,7 @@ extern "C" { typedef struct SQWorkerMgmt SQHandle; int vnodeQueryOpen(SVnode *pVnode); +void vnodeQueryClose(SVnode *pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index ba346064ae..2a3862c7cb 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -154,5 +154,6 @@ static void vnodeCloseImpl(SVnode *pVnode) { tsdbClose(pVnode->pTsdb); tqClose(pVnode->pTq); walClose(pVnode->pWal); + vnodeQueryClose(pVnode); } } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 4b47413715..0359483ba2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -24,6 +24,10 @@ int vnodeQueryOpen(SVnode *pVnode) { (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); } +void vnodeQueryClose(SVnode *pVnode) { + qWorkerDestroy((void **)&pVnode->pQuery); +} + int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta}; @@ -64,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessConsumeReq(pVnode->pTq, pMsg); + case TDMT_VND_QUERY_HEARTBEAT: + return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6f903c3e03..adba56a268 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -580,7 +580,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { - ctgError("error rsp for qnode list, error:%s, db:%s", tstrerror(rpcRsp.code)); + ctgError("error rsp for qnode list, error:%s", tstrerror(rpcRsp.code)); CTG_ERR_RET(rpcRsp.code); } @@ -590,7 +590,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt CTG_ERR_RET(code); } - ctgDebug("Got qnode list from mnode, listNum:%d", taosArrayGetSize(*out)); + ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index d061f51407..b5b8726a4c 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "tlockfree.h" +#include "ttimer.h" #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 @@ -84,6 +85,11 @@ typedef struct SQWMsg { void *connection; } SQWMsg; +typedef struct SQWHbInfo { + SSchedulerHbRsp rsp; + void *connection; +} SQWHbInfo; + typedef struct SQWPhaseInput { int8_t taskStatus; int8_t taskType; @@ -97,6 +103,7 @@ typedef struct SQWPhaseOutput { typedef struct SQWTaskStatus { + int64_t refId; // job's refId int32_t code; int8_t status; } SQWTaskStatus; @@ -124,6 +131,8 @@ typedef struct SQWTaskCtx { typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second + uint64_t hbSeqId; + void *hbConnection; SRWLatch tasksLock; SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; @@ -144,8 +153,8 @@ typedef struct SQWorkerMgmt { sendReqToDnodeFp sendReqFp; } SQWorkerMgmt; -#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId -#define QW_IDS() sId, qId, tId +#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId +#define QW_IDS() sId, qId, tId, rId #define QW_FPARAMS() mgmt, QW_IDS() #define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event]) diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 51f55d238f..ecb5dbd654 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -23,22 +23,24 @@ extern "C" { #include "qworkerInt.h" #include "dataSinkMgt.h" -int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType); -int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); -int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); -int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); -int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType); +int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); -int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); -int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); +int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3208c9f646..02af06dda8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -106,7 +106,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { } -int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { @@ -132,7 +132,7 @@ int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { return TSDB_CODE_SUCCESS; } -int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); @@ -140,7 +140,7 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc QW_UNLOCK(rwType, &mgmt->schLock); if (QW_NOT_EXIST_ADD == nOpt) { - QW_ERR_RET(qwAddSchedulerImpl(QW_FPARAMS(), rwType, sch)); + QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType, sch)); nOpt = QW_NOT_EXIST_RET_ERR; @@ -148,7 +148,7 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc } else if (QW_NOT_EXIST_RET_ERR == nOpt) { QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST); } else { - QW_TASK_ELOG("unknown notExistOpt:%d", nOpt); + QW_SCH_ELOG("unknown notExistOpt:%d", nOpt); QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } } @@ -159,12 +159,12 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc return TSDB_CODE_SUCCESS; } -int32_t qwAcquireAddScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD); +int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD); } -int32_t qwAcquireScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR); +int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR); } void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { @@ -196,6 +196,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i SQWTaskStatus ntask = {0}; ntask.status = status; + ntask.refId = rId; QW_LOCK(QW_WRITE, &sch->tasksLock); code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); @@ -225,7 +226,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) { SQWSchStatus *tsch = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch)); + QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch)); QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL)); @@ -411,7 +412,7 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - if (qwAcquireScheduler(QW_FPARAMS(), QW_WRITE, &sch)) { + if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) { QW_TASK_WLOG_E("scheduler does not exist"); return TSDB_CODE_SUCCESS; } @@ -443,7 +444,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch)); + QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task)); QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status)); @@ -521,25 +522,27 @@ _return: QW_RET(code); } -int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStatusRsp **rsp) { +int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; QW_LOCK(QW_READ, &sch->tasksLock); taskNum = taosHashGetSize(sch->tasksHash); - - int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; - *rsp = calloc(1, size); - if (NULL == *rsp) { - QW_SCH_ELOG("calloc %d failed", size); + + hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus)); + if (NULL == hbInfo->rsp.taskStatus) { QW_UNLOCK(QW_READ, &sch->tasksLock); - + QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum); return TSDB_CODE_QRY_OUT_OF_MEMORY; } + hbInfo->connection = sch->hbConnection; + hbInfo->rsp.seqId = -1; + void *key = NULL; size_t keyLen = 0; int32_t i = 0; + STaskStatus status = {0}; void *pIter = taosHashIterate(sch->tasksHash, NULL); while (pIter) { @@ -548,8 +551,11 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStat //TODO GET EXECUTOR API TO GET MORE INFO - QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); - (*rsp)->status[i].status = taskStatus->status; + QW_GET_QTID(key, status.queryId, status.taskId); + status.status = taskStatus->status; + status.refId = taskStatus->refId; + + taosArrayPush(hbInfo->rsp.taskStatus, &status); ++i; pIter = taosHashIterate(sch->tasksHash, pIter); @@ -557,8 +563,6 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStat QW_UNLOCK(QW_READ, &sch->tasksLock); - (*rsp)->num = taskNum; - return TSDB_CODE_SUCCESS; } @@ -1353,11 +1357,36 @@ _return: QW_RET(code); } -void qwProcessHbTimer(void *param, void *tmrId) { +int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { + int32_t code = 0; + SSchedulerHbRsp rsp = {0}; + SQWSchStatus *sch = NULL; + uint64_t seqId = 0; + + memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + + QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + + atomic_store_ptr(&sch->hbConnection, qwMsg->connection); + ++sch->hbSeqId; + + rsp.seqId = sch->hbSeqId; + + qwReleaseScheduler(QW_READ, mgmt); + +_return: + + qwBuildAndSendHbRsp(qwMsg->connection, &rsp, code); + + QW_RET(code); +} + + +void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param; SQWSchStatus *sch = NULL; int32_t taskNum = 0; - SSchedulerStatusRsp **rspList = NULL; + SQWHbInfo *rspList = NULL; int32_t code = 0; QW_LOCK(QW_READ, &mgmt->schLock); @@ -1365,14 +1394,16 @@ void qwProcessHbTimer(void *param, void *tmrId) { int32_t schNum = taosHashGetSize(mgmt->schHash); if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); - return TSDB_CODE_SUCCESS; + taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + return; } - rspList = calloc(schNum, POINTER_BYTES); + rspList = calloc(schNum, sizeof(SQWHbInfo)); if (NULL == rspList) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_ELOG("calloc %d rsp pointer failed", schNum); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + QW_ELOG("calloc %d SQWHbInfo failed", schNum); + taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + return; } void *key = NULL; @@ -1396,13 +1427,13 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendStatusRsp(NULL, rspList[j]); //TODO SCHEDULER CONNECTION - tfree(rspList[j]); + qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code); + tFreeSSchedulerHbRsp(&rspList[j].rsp); } tfree(rspList); - QW_RET(code); + taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); } int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, @@ -1455,7 +1486,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - mgmt->hbTimer = taosTmrStart(qwProcessHbTimer, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer); + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer); if (NULL == mgmt->hbTimer) { qError("start hb timer failed"); QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1491,6 +1522,9 @@ void qWorkerDestroy(void **qWorkerMgmt) { } SQWorkerMgmt *mgmt = *qWorkerMgmt; + + taosTmrStopA(&mgmt->hbTimer); + taosTmrCleanUp(mgmt->timer); //TODO STOP ALL QUERY @@ -1500,10 +1534,11 @@ void qWorkerDestroy(void **qWorkerMgmt) { } int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { +/* SQWSchStatus *sch = NULL; int32_t taskNum = 0; - QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch)); + QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch)); sch->lastAccessTs = taosGetTimestampSec(); @@ -1541,7 +1576,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs qwReleaseScheduler(QW_READ, mgmt); (*rsp)->num = taskNum; - +*/ return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 06b8310d45..7d633d1c73 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -82,30 +82,18 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { - int32_t size = 0; - - if (sStatus) { - size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num; - } else { - size = sizeof(SSchedulerStatusRsp); - } - - SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size); - - if (sStatus) { - memcpy(pRsp, sStatus, size); - } else { - pRsp->num = 0; - } +int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t code) { + int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); + void *pRsp = rpcMallocCont(contLen); + tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_TASKS_STATUS_RSP, + .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, - .contLen = size, - .code = 0, + .contLen = contLen, + .code = code, }; rpcSendResponse(&rpcRsp); @@ -243,7 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) { +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { SRpcMsg *pMsg = (SRpcMsg *)connection; SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { @@ -294,12 +282,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); + msg->refId = be64toh(msg->refId); msg->phyLen = ntohl(msg->phyLen); msg->sqlLen = ntohl(msg->sqlLen); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; + int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg}; @@ -331,6 +321,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; @@ -362,12 +353,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; QW_SCH_TASK_DLOG("processReady start, node:%p", node); - QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg)); + QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg)); QW_SCH_TASK_DLOG("processReady end, node:%p", node); @@ -392,11 +384,11 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); + //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: - QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); + //QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); return TSDB_CODE_SUCCESS; } @@ -421,6 +413,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; + int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; @@ -450,9 +443,10 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = htobe64(msg->sId); - msg->queryId = htobe64(msg->queryId); - msg->taskId = htobe64(msg->taskId); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); + msg->refId = be64toh(msg->refId); //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); @@ -480,10 +474,12 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); + msg->refId = be64toh(msg->refId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; + int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; @@ -496,6 +492,39 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + int32_t code = 0; + SSchedulerHbReq req = {0}; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + + if (NULL == pMsg->pCont) { + QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) { + QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); + tFreeSSchedulerHbReq(&req); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + uint64_t sId = req.sId; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + + QW_SCH_DLOG("processHb start, node:%p", node); + + QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); + + QW_SCH_DLOG("processHb end, node:%p", node); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 12c4f824ac..d9c2ae6d65 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,6 +43,12 @@ typedef struct SSchTrans { void *transHandle; } SSchTrans; +typedef struct SSchHbTrans { + SRWLatch lock; + uint64_t seqId; + SSchTrans trans; +} SSchHbTrans; + typedef struct SSchApiStat { } SSchApiStat; @@ -63,18 +69,19 @@ typedef struct SSchedulerStat { typedef struct SSchedulerMgmt { - uint64_t taskId; // sequential taksId - uint64_t sId; // schedulerId - SSchedulerCfg cfg; - int32_t jobRef; - SSchedulerStat stat; + uint64_t taskId; // sequential taksId + uint64_t sId; // schedulerId + SSchedulerCfg cfg; + int32_t jobRef; + SSchedulerStat stat; + SHashObj *hbConnections; } SSchedulerMgmt; typedef struct SSchCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; - SEpSet epSet; + void *transport; } SSchCallbackParam; typedef struct SSchFlowControl { @@ -157,6 +164,10 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) +#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) +#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) +#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) + #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) #define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) @@ -183,11 +194,11 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_TASK_ELOG(param, ...) \ - qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) + qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \ - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) #define SCH_TASK_WLOG(param, ...) \ - qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fe886dfcdb..11dcf478b8 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -104,7 +104,7 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { - int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); + int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: @@ -130,7 +130,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - atomic_store_32(&pTask->lastMsgType, -1); + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; } @@ -622,6 +622,47 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { + int32_t code = 0; + SSchHbTrans *hb = NULL; + + while (true) { + hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), trans, sizeof(SSchHbTrans)); + if (code) { + if (HASH_NODE_EXIST(code)) { + continue; + } + + qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_ERR_RET(code); + } + + return TSDB_CODE_SUCCESS; + } + + break; + } + + SCH_LOCK(SCH_WRITE, &hb->lock); + + if (hb->seqId >= trans->seqId) { + qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d", + trans->seqId, hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port); + + SCH_UNLOCK(SCH_WRITE, &hb->lock); + return TSDB_CODE_SUCCESS; + } + + hb->seqId = trans->seqId; + memcpy(&hb->trans, &trans->trans, sizeof(trans->trans)); + + SCH_UNLOCK(SCH_WRITE, &hb->lock); + + return TSDB_CODE_SUCCESS; +} + int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); @@ -961,7 +1002,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SSchCallbackParam *pParam = (SSchCallbackParam *)param; SSchTask *pTask = NULL; - SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, pParam->refId); + SSchJob *pJob = schAcquireJob(pParam->refId); if (NULL == pJob) { qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); @@ -988,7 +1029,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in _return: if (pJob) { - taosReleaseRef(schMgmt.jobRef, pParam->refId); + schReleaseJob(pParam->refId); } tfree(param); @@ -1020,6 +1061,55 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); } + +int32_t schHandleHbCallback(void* param, const SDataBuf* pMsg, int32_t code) { + if (code) { + qError("hb rsp error:%s", tstrerror(code)); + SCH_ERR_RET(code); + } + + SSchedulerHbRsp rsp = {0}; + + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + + if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) { + qError("invalid hb rsp msg, size:%d", pMsg->len); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (rsp.seqId != (uint64_t)-1) { + SSchHbTrans trans = {0}; + trans.seqId = rsp.seqId; + trans.trans.transInst = pParam->transport; + trans.trans.transHandle = pMsg->handle; + + SCH_RET(schUpdateHbConnection(&rsp.epId, &trans)); + } + + int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); + for (int32_t i = 0; i < taskNum; ++i) { + STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i); + + SSchJob *pJob = schAcquireJob(taskStatus->refId); + if (NULL == pJob) { + qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, taskStatus->queryId, taskStatus->taskId); + //TODO DROP TASK FROM SERVER!!!! + continue; + } + + // TODO + + schReleaseJob(taskStatus->refId); + } + +_return: + + tFreeSSchedulerHbRsp(&rsp); + + SCH_RET(code); +} + + int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: @@ -1040,6 +1130,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_VND_DROP_TASK: *fp = schHandleDropCallback; break; + case TDMT_VND_QUERY_HEARTBEAT: + *fp = schHandleHbCallback; + break; default: qError("unknown msg type for callback, msgType:%d", msgType); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -1071,7 +1164,8 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* param->queryId = pJob->queryId; param->refId = pJob->refId; - param->taskId = pTask->taskId; + param->taskId = SCH_TASK_ID(pTask); + param->transport = trans->transInst; pMsgSendInfo->param = param; @@ -1138,6 +1232,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); + pMsg->refId = htobe64(pJob->refId); pMsg->taskType = TASK_TYPE_TEMP; pMsg->phyLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len); @@ -1196,6 +1291,30 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); + pMsg->refId = htobe64(pJob->refId); + break; + } + case TDMT_VND_QUERY_HEARTBEAT: { + SSchedulerHbReq req = {0}; + req.sId = schMgmt.sId; + req.header.vgId = addr->nodeId; + req.epId.nodeId = addr->nodeId; + memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); + + msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req); + if (msgSize < 0) { + SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + msg = calloc(1, msgSize); + if (NULL == msg) { + SCH_JOB_ELOG("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) { + SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } break; } default: @@ -1204,9 +1323,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } - atomic_store_32(&pTask->lastMsgType, msgType); + SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle}; + SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize)); if (isCandidateAddr) { @@ -1217,12 +1336,26 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, _return: - atomic_store_32(&pTask->lastMsgType, -1); - + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + tfree(msg); SCH_RET(code); } +int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SQueryNodeEpId epId = {0}; + + epId.nodeId = addr->nodeId; + memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); + + SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT)); + } + + return TSDB_CODE_SUCCESS; +} int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; @@ -1240,7 +1373,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { if (NULL == pTask->msg) { // TODO add more detailed reason for failure code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); - if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { + if (TSDB_CODE_SUCCESS != code) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); SCH_ERR_RET(code); } else { @@ -1256,7 +1389,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); } - + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); + } + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); return TSDB_CODE_SUCCESS; @@ -1459,7 +1595,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD pJob->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(pJob)); - taosAcquireRef(schMgmt.jobRef, pJob->refId); + schAcquireJob(pJob->refId); *job = pJob->refId; @@ -1470,7 +1606,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); - taosReleaseRef(schMgmt.jobRef, pJob->refId); + schReleaseJob(pJob->refId); return TSDB_CODE_SUCCESS; @@ -1493,6 +1629,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { if (schMgmt.cfg.maxJobNum == 0) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; } + if (schMgmt.cfg.maxNodeTableNum <= 0) { + schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; + } } else { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; @@ -1500,7 +1639,13 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); if (schMgmt.jobRef < 0) { - qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum); + qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.hbConnections) { + qError("taosHashInit hb connections failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1521,10 +1666,10 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); - SSchJob *job = taosAcquireRef(schMgmt.jobRef, *pJob); + SSchJob *job = schAcquireJob(*pJob); pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; - taosReleaseRef(schMgmt.jobRef, *pJob); + schReleaseJob(*pJob); return TSDB_CODE_SUCCESS; } @@ -1573,7 +1718,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) { tInfo.addr = plan->execNode; code = qSubPlanToString(plan, &msg, &msgLen); - if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) { + if (TSDB_CODE_SUCCESS != code) { qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen); SCH_ERR_JRET(code); } @@ -1667,7 +1812,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { } int32_t code = 0; - SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -1676,19 +1821,19 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - taosReleaseRef(schMgmt.jobRef, job); + schReleaseJob(job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (!SCH_JOB_NEED_FETCH(pJob)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); - taosReleaseRef(schMgmt.jobRef, job); + schReleaseJob(job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - taosReleaseRef(schMgmt.jobRef, job); + schReleaseJob(job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1741,13 +1886,13 @@ _return: atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); - taosReleaseRef(schMgmt.jobRef, job); + schReleaseJob(job); SCH_RET(code); } int32_t scheduleCancelJob(int64_t job) { - SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -1755,13 +1900,13 @@ int32_t scheduleCancelJob(int64_t job) { int32_t code = schCancelJob(pJob); - taosReleaseRef(schMgmt.jobRef, job); + schReleaseJob(job); SCH_RET(code); } void schedulerFreeJob(int64_t job) { - SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job); + SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); return; @@ -1776,6 +1921,8 @@ void schedulerFreeJob(int64_t job) { if (taosRemoveRef(schMgmt.jobRef, job)) { SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job); } + + schReleaseJob(job); } void schedulerFreeTaskList(SArray *taskList) {