diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index affa265d53..102a5954d6 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -146,6 +146,8 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx); + /** * Asynchronously send message to server, after the response received, the callback will be incured. * diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 44f1c454c9..e37b738e33 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) return 0; } -int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); @@ -154,14 +154,19 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp .contLen = pInfo->msgInfo.len, .ahandle = (void*)pInfo, .handle = pInfo->msgInfo.handle, + .persistHandle = persistHandle, .code = 0}; assert(pInfo->fp != NULL); - rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); + rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, ctx); return TSDB_CODE_SUCCESS; } +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); +} + char *jobTaskStatusStr(int32_t status) { switch (status) { case JOB_TASK_STATUS_NULL: diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 6e2d482c05..56249421c1 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -70,11 +70,16 @@ typedef struct SQWDebug { bool statusEnable; } SQWDebug; +typedef struct SQWConnInfo { + void *handle; + void *ahandle; +} SQWConnInfo; + typedef struct SQWMsg { - void *node; - char *msg; - int32_t msgLen; - void *connection; + void *node; + char *msg; + int32_t msgLen; + SQWConnInfo connInfo; } SQWMsg; typedef struct SQWHbInfo { @@ -100,10 +105,6 @@ typedef struct SQWTaskCtx { SRWLatch lock; int8_t phase; int8_t taskType; - - void *readyConnection; - void *dropConnection; - void *cancelConnection; bool emptyRes; bool queryFetched; @@ -112,6 +113,7 @@ typedef struct SQWTaskCtx { bool queryInQueue; int32_t rspCode; + SQWConnInfo connInfo; int8_t events[QW_EVENT_MAX]; qTaskInfo_t taskHandle; @@ -119,11 +121,11 @@ typedef struct SQWTaskCtx { } SQWTaskCtx; typedef struct SQWSchStatus { - int32_t lastAccessTs; // timestamp in second - uint64_t hbSeqId; - void *hbConnection; - SRWLatch tasksLock; - SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus + int32_t lastAccessTs; // timestamp in second + uint64_t hbSeqId; + SQWConnInfo *hbConnection; + SRWLatch tasksLock; + SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; // Qnode/Vnode level task management diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index ecb5dbd654..141e8f7916 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -41,6 +41,7 @@ void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 60eb501dd2..5c73674365 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -402,6 +402,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + rpcReleaseHandle(ctx->connInfo.handle, CONN_SERVER); + ctx->connInfo.handle = NULL; + qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); if (ctx->sinkHandle) { @@ -729,7 +732,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - dropConnection = ctx->dropConnection; + dropConnection = &ctx->connInfo; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); break; } @@ -763,7 +766,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - dropConnection = ctx->dropConnection; + dropConnection = &ctx->connInfo; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -807,9 +810,8 @@ _return: int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; + SQWConnInfo connInfo = {0}; void *readyConnection = NULL; - void *dropConnection = NULL; - void *cancelConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -826,11 +828,18 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { ctx->emptyRes = true; } - + +#if 0 if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { - readyConnection = ctx->readyConnection; + readyConnection = &ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); } +#else + connInfo.handle = ctx->connInfo.handle; + readyConnection = &connInfo; + + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); +#endif } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -841,7 +850,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - dropConnection = ctx->dropConnection; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -869,21 +877,11 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (readyConnection) { + if (TSDB_CODE_SUCCESS == code && readyConnection) { qwBuildAndSendReadyRsp(readyConnection, code); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } - if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, code); - QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); - } - - if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); - QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); - } - if (code) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } @@ -893,17 +891,17 @@ _return: QW_RET(code); } - int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { int32_t code = 0; bool queryRsped = false; - bool needStop = false; struct SSubplan *plan = NULL; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; + QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -927,7 +925,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); queryRsped = true; @@ -945,7 +943,7 @@ _return: code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); if (!queryRsped) { - qwBuildAndSendQueryRsp(qwMsg->connection, code); + qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); } @@ -968,8 +966,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (ctx->phase == QW_PHASE_PRE_QUERY) { + ctx->connInfo.handle == qwMsg->connInfo.handle; + ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); - ctx->readyConnection = qwMsg->connection; needRsp = false; QW_TASK_DLOG_E("ready msg will not rsp now"); goto _return; @@ -1007,7 +1006,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(qwMsg->connection, code); + qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); } @@ -1050,7 +1049,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } else { atomic_store_8(&ctx->queryContinue, 1); @@ -1067,7 +1066,7 @@ _return: QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwFreeFetchRsp(rsp); rsp = NULL; - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); } @@ -1102,6 +1101,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { + atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle); + atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); @@ -1123,7 +1124,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8(&ctx->queryInQueue, 1); - QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection)); + QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); } } @@ -1143,7 +1144,7 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); } @@ -1180,8 +1181,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { // task not started } - if (!needRsp) { - ctx->dropConnection = qwMsg->connection; + if (!needRsp) { + ctx->connInfo.handle == qwMsg->connInfo.handle; + ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } @@ -1205,7 +1207,7 @@ _return: } if (TSDB_CODE_SUCCESS != code || needRsp) { - QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code)); + QW_ERR_RET(qwBuildAndSendDropRsp(&qwMsg->connInfo, code)); QW_TASK_DLOG("drop msg rsped, code:%x", code); } @@ -1223,27 +1225,25 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - atomic_store_ptr(&sch->hbConnection, qwMsg->connection); + atomic_store_ptr(&sch->hbConnection, qwMsg->connInfo); ++sch->hbSeqId; rsp.seqId = sch->hbSeqId; - QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, connection:%p", - sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connection); + QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", + sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); qwReleaseScheduler(QW_READ, mgmt); _return: - qwBuildAndSendHbRsp(qwMsg->connection, &rsp, code); + qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); QW_RET(code); } void qwProcessHbTimerEvent(void *param, void *tmrId) { - return; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param; SQWSchStatus *sch = NULL; int32_t taskNum = 0; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 98b917c525..b07ddb7196 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -26,6 +26,8 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { return TSDB_CODE_SUCCESS; } + + void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; @@ -44,8 +46,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { - SRpcMsg *pMsg = (SRpcMsg *)connection; +int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); @@ -54,8 +55,8 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = msg, .contLen = contLen, .code = code, @@ -66,15 +67,14 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) { - SRpcMsg *pMsg = (SRpcMsg *)connection; +int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_RES_READY_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -85,15 +85,15 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t code) { +int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = pRsp, .contLen = contLen, .code = code, @@ -104,9 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t cod return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { - SRpcMsg *pMsg = (SRpcMsg *)connection; - +int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -115,8 +113,8 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ SRpcMsg rpcRsp = { .msgType = TDMT_VND_FETCH_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp) + dataLength, .code = code, @@ -127,14 +125,14 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { +int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_CANCEL_TASK_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -144,15 +142,14 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) { - SRpcMsg *pMsg = (SRpcMsg *)connection; +int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); pRsp->code = code; SRpcMsg rpcRsp = { .msgType = TDMT_VND_DROP_TASK_RSP, - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -234,8 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { - SRpcMsg *pMsg = (SRpcMsg *)connection; +int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); @@ -248,8 +244,8 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { req->taskId = tId; SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, + .handle = pConn->handle, + .ahandle = pConn->ahandle, .msgType = TDMT_VND_QUERY_CONTINUE, .pCont = req, .contLen = sizeof(SQueryContinueReq), @@ -268,6 +264,35 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) { return TSDB_CODE_SUCCESS; } + +int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { + STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); + if (NULL == req) { + QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = qId; + req->taskId = tId; + req->refId = rId; + + SRpcMsg pMsg = { + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .msgType = TDMT_VND_DROP_TASK, + .pCont = req, + .contLen = sizeof(STaskDropReq), + .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + }; + + rpcRegisterBrokenLinkArg(&pMsg); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -294,7 +319,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t tId = msg->taskId; int64_t rId = msg->refId; - SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; char* sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql); @@ -326,7 +353,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t tId = msg->taskId; int64_t rId = 0; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; QW_SCH_TASK_DLOG("processCQuery start, node:%p", node); @@ -358,7 +387,9 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ uint64_t tId = msg->taskId; int64_t rId = 0; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; QW_SCH_TASK_DLOG("processReady start, node:%p", node); @@ -418,7 +449,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t tId = msg->taskId; int64_t rId = 0; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; QW_SCH_TASK_DLOG("processFetch start, node:%p", node); @@ -484,7 +517,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t tId = msg->taskId; int64_t rId = msg->refId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; QW_SCH_TASK_DLOG("processDrop start, node:%p", node); @@ -516,7 +551,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } uint64_t sId = req.sId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + qwMsg.connInfo.handle = pMsg->handle; + qwMsg.connInfo.ahandle = pMsg->ahandle; QW_SCH_DLOG("processHb start, node:%p", node); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 189124ef23..1f76457c13 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -70,6 +70,22 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } +void schFreeRpcCtx(SRpcCtx *pCtx) { + if (NULL == pCtx) { + return; + } + void *pIter = taosHashIterate(pCtx->args, NULL); + while (pIter) { + SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter; + + ctxVal->free(ctxVal->v); + + pIter = taosHashIterate(pCtx->args, pIter); + } + + taosHashCleanup(pCtx->args); +} + void schFreeTask(SSchTask* pTask) { if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); @@ -106,30 +122,41 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); + int32_t reqMsgType = msgType - 1; switch (msgType) { + case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp + if (lastMsgType != reqMsgType) { + SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); + } + + if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); + } + + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + return; + case TDMT_VND_RES_READY_RSP: + reqMsgType = TDMT_VND_QUERY; + break; case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: - case TDMT_VND_QUERY_RSP: - case TDMT_VND_RES_READY_RSP: case TDMT_VND_FETCH_RSP: - case TDMT_VND_DROP_TASK: - if (lastMsgType != (msgType - 1)) { - SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - break; default: SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus)); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + if (lastMsgType != reqMsgType) { + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; @@ -1006,7 +1033,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); + //SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); break; } @@ -1212,8 +1239,122 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { return TSDB_CODE_SUCCESS; } +void schFreeRpcCtxVal(void *arg) { + if (NULL == arg) { + return; + } + + SMsgSendInfo* pMsgSendInfo = arg; + tfree(pMsgSendInfo->param); + tfree(pMsgSendInfo); +} + +int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { + int32_t code = 0; + SSchCallbackParam *param = NULL; + SMsgSendInfo* pMsgSendInfo = NULL; -int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) { + pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + if (NULL == pCtx->args) { + SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + param = calloc(1, sizeof(SSchCallbackParam)); + if (NULL == param) { + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + int32_t msgType = TDMT_VND_RES_READY_RSP; + __async_send_cb_fn_t fp = NULL; + SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_RES_READY, &fp)); + + param->queryId = pJob->queryId; + param->refId = pJob->refId; + param->taskId = SCH_TASK_ID(pTask); + param->transport = pJob->transport; + + pMsgSendInfo->param = param; + pMsgSendInfo->fp = fp; + + SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { + SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + +_return: + + taosHashCleanup(pCtx->args); + tfree(param); + tfree(pMsgSendInfo); + + SCH_RET(code); +} + +int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { + int32_t code = 0; + SSchCallbackParam *param = NULL; + SMsgSendInfo* pMsgSendInfo = NULL; + + pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + if (NULL == pCtx->args) { + SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + param = calloc(1, sizeof(SSchCallbackParam)); + if (NULL == param) { + SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP; + __async_send_cb_fn_t fp = NULL; + SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); + + param->queryId = pJob->queryId; + param->refId = pJob->refId; + param->taskId = SCH_TASK_ID(pTask); + param->transport = pJob->transport; + + pMsgSendInfo->param = param; + pMsgSendInfo->fp = fp; + + SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { + SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + +_return: + + taosHashCleanup(pCtx->args); + tfree(param); + tfree(pMsgSendInfo); + + SCH_RET(code); +} + + +int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) { int32_t code = 0; SSchTrans *trans = (SSchTrans *)transport; @@ -1237,7 +1378,6 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); param->transport = trans->transInst; - pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; @@ -1247,7 +1387,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* pMsgSendInfo->fp = fp; int64_t transporterId = 0; - code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1267,6 +1407,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *msg = NULL; int32_t code = 0; bool isCandidateAddr = false; + bool persistHandle = false; + SRpcCtx rpcCtx = {0}; + if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; @@ -1289,8 +1432,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } case TDMT_VND_QUERY: { + SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); + uint32_t len = strlen(pJob->sql); - msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len; msg = calloc(1, msgSize); if (NULL == msg) { @@ -1310,7 +1454,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg, pJob->sql, len); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); - + + persistHandle = true; break; } @@ -1367,6 +1512,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } case TDMT_VND_QUERY_HEARTBEAT: { + SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx)); + SSchedulerHbReq req = {0}; req.sId = schMgmt.sId; req.header.vgId = addr->nodeId; @@ -1387,6 +1534,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + persistHandle = true; break; } default: @@ -1398,7 +1547,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL}; - SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize)); + SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); @@ -1409,7 +1558,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, _return: SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); - + schFreeRpcCtx(&rpcCtx); tfree(msg); SCH_RET(code); }