From f7cb4b9b09a6a88add1135441f965d681b0ae45d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 29 Nov 2024 14:56:29 +0800 Subject: [PATCH 1/2] update interface --- include/common/tmsgcb.h | 4 +- include/libs/transport/trpc.h | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 +- source/libs/qworker/src/qwUtil.c | 77 +++++----- source/libs/qworker/src/qworker.c | 142 +++++++++--------- source/libs/scheduler/src/schRemote.c | 6 +- source/libs/scheduler/src/schUtil.c | 43 +++--- source/libs/transport/inc/transComm.h | 4 +- source/libs/transport/src/tmsgcb.c | 4 +- source/libs/transport/src/trans.c | 6 +- source/libs/transport/src/transCli.c | 53 ++++++- source/libs/transport/src/transSvr.c | 4 +- source/libs/transport/test/transUT.cpp | 6 +- source/libs/transport/test/transUT2.cpp | 4 +- 14 files changed, 204 insertions(+), 155 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 0c61aa5a51..6752287ed1 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -47,7 +47,7 @@ typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg); -typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); +typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type, int32_t status); typedef void (*ReportStartup)(const char* name, const char* desc); typedef struct { @@ -76,7 +76,7 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); -void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); +void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type, int32_t code); void tmsgReportStartup(const char* name, const char* desc); bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); void tmsgUpdateDnodeEpSet(SEpSet* epset); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index cfa3f44f7f..7b3fb5bb79 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -171,7 +171,7 @@ void *rpcReallocCont(void *ptr, int64_t contLen); int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); int32_t rpcSendResponse(const SRpcMsg *pMsg); int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg); -int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock +int32_t rpcReleaseHandle(void *handle, int8_t type, int32_t code); // just release conn to rpc instance, no close sock // These functions will not be called in the child process int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5f396a520a..acd95d4b43 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -368,7 +368,9 @@ static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); } -static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { (void)rpcReleaseHandle(pHandle, type); } +static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) { + (void)rpcReleaseHandle(pHandle, type, status); +} static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND || diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index a9b4be0645..56ec8d8c03 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -286,12 +286,12 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) { void qwFreeSinkHandle(SQWTaskCtx *ctx) { // Note: free/kill may in RC - void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle); + void *osinkHandle = atomic_load_ptr(&ctx->sinkHandle); if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) { QW_SINK_ENABLE_MEMPOOL(ctx); dsDestroyDataSinker(osinkHandle); QW_SINK_DISABLE_MEMPOOL(); - + qDebug("sink handle destroyed"); } } @@ -312,7 +312,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { - tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, 0); } ctx->ctrlConnInfo.handle = NULL; @@ -336,20 +336,19 @@ static void freeExplainExecItem(void *param) { taosMemoryFree(pInfo->verboseInfo); } - int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; qTaskInfo_t taskHandle = ctx->taskHandle; ctx->explainRsped = true; - + SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); if (NULL == execInfoList) { QW_ERR_JRET(terrno); } - + QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList)); - + if (ctx->localExec) { SExplainLocalRsp localRsp = {0}; localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); @@ -367,7 +366,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) { QW_ERR_JRET(terrno); } - + taosArrayDestroy(execInfoList); execInfoList = NULL; } else { @@ -387,8 +386,6 @@ _return: return code; } - - int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, cId, tId, eId); @@ -477,7 +474,6 @@ _return: QW_RET(code); } - int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, cId, tId, eId); @@ -538,16 +534,17 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { - char dbFName[TSDB_DB_FNAME_LEN]; - char tbName[TSDB_TABLE_NAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; STbVerInfo tbInfo; - int32_t i = 0; - int32_t code = TSDB_CODE_SUCCESS; - bool tbGet = false; + int32_t i = 0; + int32_t code = TSDB_CODE_SUCCESS; + bool tbGet = false; while (true) { tbGet = false; - code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, TSDB_DB_FNAME_LEN, tbName, TSDB_TABLE_NAME_LEN, &tbInfo.sversion, &tbInfo.tversion, i, &tbGet); + code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, TSDB_DB_FNAME_LEN, tbName, TSDB_TABLE_NAME_LEN, + &tbInfo.sversion, &tbInfo.tversion, i, &tbGet); if (TSDB_CODE_SUCCESS != code || !tbGet) { break; } @@ -564,11 +561,11 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { QW_ERR_RET(terrno); } } - + if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) { QW_ERR_RET(terrno); } - + i++; } @@ -580,7 +577,7 @@ void qwCloseRef(void) { if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; - + taosHashCleanup(gQueryMgmt.pJobInfo); gQueryMgmt.pJobInfo = NULL; } @@ -601,7 +598,7 @@ void qwDestroyImpl(void *pMgmt) { if (taosTmrStop(mgmt->hbTimer)) { qTrace("stop qworker hb timer may failed"); } - + mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); @@ -615,7 +612,7 @@ void qwDestroyImpl(void *pMgmt) { void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, cId, tId, eId); sId = ctx->sId; - + qwFreeTaskCtx(QW_FPARAMS(), ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); @@ -727,33 +724,33 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { } } -void qwDestroyJobInfo(void* job) { +void qwDestroyJobInfo(void *job) { if (NULL == job) { return; } - SQWJobInfo* pJob = (SQWJobInfo*)job; + SQWJobInfo *pJob = (SQWJobInfo *)job; taosMemoryFreeClear(pJob->memInfo); taosHashCleanup(pJob->pSessions); pJob->pSessions = NULL; } -bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t code = TSDB_CODE_SUCCESS; - bool taskFreed = false; - + bool taskFreed = false; + QW_LOCK(QW_WRITE, &ctx->lock); - + QW_TASK_DLOG_E("start to force stop task"); - + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); - + return taskFreed; } - + if (QW_QUERY_RUNNING(ctx)) { code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); if (TSDB_CODE_SUCCESS != code) { @@ -774,14 +771,14 @@ bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { taskFreed = true; } } - + QW_UNLOCK(QW_WRITE, &ctx->lock); return taskFreed; } bool qwRetireTask(QW_FPARAMS_DEF) { - SQWTaskCtx *ctx = NULL; + SQWTaskCtx *ctx = NULL; int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx); if (TSDB_CODE_SUCCESS != code) { @@ -795,17 +792,18 @@ bool qwRetireTask(QW_FPARAMS_DEF) { return retired; } -bool qwRetireJob(SQWJobInfo* pJob) { +bool qwRetireJob(SQWJobInfo *pJob) { if (NULL == pJob) { return false; } - bool retired = true; - void* pIter = taosHashIterate(pJob->pSessions, NULL); + bool retired = true; + void *pIter = taosHashIterate(pJob->pSessions, NULL); while (pIter) { - SQWSessionInfo* pSession = (SQWSessionInfo*)pIter; + SQWSessionInfo *pSession = (SQWSessionInfo *)pIter; - if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) { + if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, + pSession->rId, pSession->eId)) { retired = false; } @@ -814,4 +812,3 @@ bool qwRetireJob(SQWJobInfo* pJob) { return retired; } - diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6d91eae4d3..4802e252f6 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,9 +18,8 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; -TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; -SQueryMgmt gQueryMgmt = {0}; - +TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; +SQueryMgmt gQueryMgmt = {0}; void qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, cId, tId, sId; @@ -53,7 +52,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re sch->hbBrokenTs = taosGetTimestampMs(); if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { - tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER, 0); sch->hbConnInfo.handle = NULL; sch->hbConnInfo.ahandle = NULL; @@ -71,8 +70,8 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; - int32_t code = TSDB_CODE_SUCCESS; - + int32_t code = TSDB_CODE_SUCCESS; + ctx->queryExecDone = true; if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { @@ -90,7 +89,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { _return: if ((!ctx->dynamicTask) && (!ctx->explain || ctx->explainRsped)) { - qwFreeTaskHandle(ctx); + qwFreeTaskHandle(ctx); } return code; @@ -131,7 +130,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (NULL == pResList) { QW_ERR_RET(terrno); } - + while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); @@ -144,7 +143,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { taosEnableMemPoolUsage(ctx->memPoolSession); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); taosDisableMemPoolUsage(); - + if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -168,7 +167,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_SINK_ENABLE_MEMPOOL(ctx); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); QW_SINK_DISABLE_MEMPOOL(); - + if (code) { QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -199,7 +198,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { QW_SINK_ENABLE_MEMPOOL(ctx); dsEndPut(sinkHandle, useconds); QW_SINK_DISABLE_MEMPOOL(); - + if (queryStop) { *queryStop = true; } @@ -233,9 +232,9 @@ _return: taosArrayDestroy(pResList); if (TSDB_CODE_SUCCESS != code) { - qwFreeTaskHandle(ctx); + qwFreeTaskHandle(ctx); } - + QW_RET(code); } @@ -251,7 +250,7 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) { int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; int32_t code = TSDB_CODE_SUCCESS; - + hbInfo->connInfo = sch->hbConnInfo; hbInfo->rsp.epId = sch->hbEpId; @@ -318,7 +317,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_SINK_ENABLE_MEMPOOL(ctx); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); QW_SINK_DISABLE_MEMPOOL(); - + if (len < 0) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -417,7 +416,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, _return: *rspMsg = pRsp; - + return code; } @@ -431,7 +430,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes QW_SINK_ENABLE_MEMPOOL(ctx); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); QW_SINK_DISABLE_MEMPOOL(); - + if (len <= 0 || len != sizeof(SDeleterRes)) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -532,7 +531,7 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg QW_SINK_ENABLE_MEMPOOL(ctx); dsReset(ctx->sinkHandle); QW_SINK_DISABLE_MEMPOOL(); - + qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); @@ -716,7 +715,8 @@ _return: } if (code) { - (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // already in error, ignore new error + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, + ctx->dynamicTask); // already in error, ignore new error } QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -747,11 +747,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->ctrlConnInfo = qwMsg->connInfo; ctx->sId = sId; ctx->phase = -1; - + if (NULL != gMemPoolHandle) { QW_ERR_JRET(qwInitSession(QW_FPARAMS(), ctx, &ctx->memPoolSession)); } - + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true)); @@ -787,7 +787,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { taosEnableMemPoolUsage(ctx->memPoolSession); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); taosDisableMemPoolUsage(); - + if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); @@ -795,9 +795,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { } taosEnableMemPoolUsage(ctx->memPoolSession); - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, + OPTR_EXEC_MODEL_BATCH); taosDisableMemPoolUsage(); - + if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); qDestroyTask(pTaskInfo); @@ -883,7 +884,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryContinue, 0); if (!ctx->dynamicTask) { - qwFreeSinkHandle(ctx); + qwFreeSinkHandle(ctx); } } @@ -915,11 +916,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - 0); + QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, + tstrerror(code), 0); } else { - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - 0); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, + tstrerror(code), 0); } } @@ -1024,8 +1025,8 @@ _return: if (!rsped) { code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), - qwMsg->connInfo.handle, code, tstrerror(code), dataLen); + QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", + TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); @@ -1076,9 +1077,10 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, + ctx->dynamicTask); // task already failed, no more error handling } else { - tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0); } } @@ -1088,7 +1090,7 @@ _return: if (ctx) { if (qwMsg->connInfo.handle != ctx->ctrlConnInfo.handle) { - tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0); } qwReleaseTaskCtx(mgmt, ctx); @@ -1130,7 +1132,8 @@ _return: if (code) { if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling + (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, + ctx->dynamicTask); // task already failed, no more error handling } } @@ -1162,7 +1165,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { - tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER, 0); sch->hbConnInfo.handle = NULL; } @@ -1171,8 +1174,9 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_UNLOCK(QW_WRITE, &sch->hbConnLock); - QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->clientId, - req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); + QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", + req->clientId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, + qwMsg->connInfo.ahandle); qwReleaseScheduler(QW_READ, mgmt); @@ -1182,7 +1186,7 @@ _return: code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { - tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); + tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0); qwMsg->connInfo.handle = NULL; } @@ -1223,7 +1227,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) { qError("reset qworker hb timer error, timer stoppped"); } - (void)qwRelease(refId); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1237,7 +1241,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) { qError("reset qworker hb timer error, timer stoppped"); } - (void)qwRelease(refId); // ignore error + (void)qwRelease(refId); // ignore error return; } @@ -1279,7 +1283,7 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error + (void)qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); // ignore error /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/ /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/ tFreeSSchedulerHbRsp(&rspList[j].rsp); @@ -1296,7 +1300,7 @@ _return: qError("reset qworker hb timer error, timer stoppped"); } - (void)qwRelease(refId); // ignore error + (void)qwRelease(refId); // ignore error } int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { @@ -1316,7 +1320,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { tsEnableRandErr = true; code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); tsEnableRandErr = false; - + if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); qDestroyTask(pTaskInfo); @@ -1354,7 +1358,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); if (1 == qwNum) { TAOS_MEMSET(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); @@ -1432,7 +1436,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - (void)qwRelease(mgmt->refId); // ignore error + (void)qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash); @@ -1533,7 +1537,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 if (NULL == rHandle.pMsgCb) { QW_ERR_JRET(terrno); } - + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; rHandle.localExec = true; @@ -1604,7 +1608,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); if (!ctx->dynamicTask) { - qwFreeSinkHandle(ctx); + qwFreeSinkHandle(ctx); } } @@ -1622,54 +1626,56 @@ _return: QW_RET(code); } - void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { char id[sizeof(jobId) + sizeof(clientId) + 1] = {0}; QW_SET_QCID(id, jobId, clientId); - SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, id, sizeof(id)); + SQWJobInfo *pJob = (SQWJobInfo *)taosHashGet(gQueryMgmt.pJobInfo, id, sizeof(id)); if (NULL == pJob) { qError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId); return; } - if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, - jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && + 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, + errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); (void)qwRetireJob(pJob); } else { - qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); + qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, + atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { - SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); - int32_t jobNum = 0; - int64_t retiredSize = 0; + SQWJobInfo *pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + int32_t jobNum = 0; + int64_t retiredSize = 0; while (retiredSize < retireSize && NULL != pJob) { if (atomic_load_8(&pJob->retired)) { - pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob); continue; } - if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && + 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - bool retired = qwRetireJob(pJob); + bool retired = qwRetireJob(pJob); if (retired) { - retiredSize += aSize; + retiredSize += aSize; } - + jobNum++; - qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize); + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 + ", retireSize:%" PRId64, + pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize); } - pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob); } - qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); + qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, + retireSize); } - - diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 70c63e60b3..c5d8b0fde5 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -509,7 +509,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->clientId, pParam->taskId, code); // called if drop task rsp received code - (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error if (pMsg->handle == NULL) { qError("sch handle is NULL, may be already released and mem lea"); @@ -535,7 +535,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; - (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error qDebug("handle %p is broken", pMsg->handle); @@ -565,7 +565,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); - (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error SCH_ERR_JRET(code); } diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 722ec8849c..4e04b0210b 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -22,7 +22,7 @@ #include "tref.h" #include "trpc.h" -FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob** ppJob) { +FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob **ppJob) { qDebug("sch acquire jobId:0x%" PRIx64, refId); *ppJob = (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); if (NULL == *ppJob) { @@ -41,7 +41,7 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } -FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) { +FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t *released) { if (0 == refId) { return TSDB_CODE_SUCCESS; } @@ -50,7 +50,7 @@ FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) { return taosReleaseRefEx(schMgmt.jobRef, refId, released); } -int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) { +int32_t schDumpEpSet(SEpSet *pEpSet, char **ppRes) { *ppRes = NULL; if (NULL == pEpSet) { return TSDB_CODE_SUCCESS; @@ -89,7 +89,7 @@ char *schGetOpStr(SCH_OP_TYPE type) { } void schFreeHbTrans(SSchHbTrans *pTrans) { - (void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT); + (void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT, 0); schFreeRpcCtx(&pTrans->rpcCtx); } @@ -202,11 +202,12 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, + (int32_t)taosArrayGetSize(pTask->candidateAddrs)); return; } - SQueryNodeEpId epId = {0}; + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; @@ -240,11 +241,12 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, + (int32_t)taosArrayGetSize(pTask->candidateAddrs)); return TSDB_CODE_SCH_INTERNAL_ERROR; } - SQueryNodeEpId epId = {0}; + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; @@ -276,8 +278,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); - qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", - epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); + qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", epId->nodeId, epId->ep.fqdn, + epId->ep.port, trans->pTrans, trans->pHandle); return TSDB_CODE_SUCCESS; } @@ -323,7 +325,8 @@ uint64_t schGenUUID(void) { uint64_t pid = taosGetPId(); int32_t val = atomic_add_fetch_32(&requestSerialId, 1); - uint64_t id = ((uint64_t)((hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + uint64_t id = + ((uint64_t)((hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); return id; } #endif @@ -373,17 +376,17 @@ void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) *pTask = *task; } -int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) { +int32_t schValidateSubplan(SSchJob *pJob, SSubplan *pSubplan, int32_t level, int32_t idx, int32_t taskNum) { if (NULL == pSubplan) { SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) { SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan)); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) { SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -396,15 +399,17 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int if (SCH_IS_DATA_BIND_PLAN(pSubplan)) { if (pSubplan->execNode.epSet.numOfEps <= 0) { - SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); + SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, + pSubplan->execNode.epSet.numOfEps); SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS); } if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) { - SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); + SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, + pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } } - + if (NULL == pSubplan->pNode && pSubplan->subplanType != SUBPLAN_TYPE_MODIFY) { SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d, subplanType:%d", level, idx, pSubplan->subplanType); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -418,7 +423,7 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int return TSDB_CODE_SUCCESS; } -void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp) { +void schStopTaskDelayTimer(SSchJob *pJob, SSchTask *pTask, bool syncOp) { if (!taosTmrStopA(&pTask->delayTimer)) { if (syncOp) { while (!taosTmrIsStopped(&pTask->delayTimer)) { @@ -429,5 +434,3 @@ void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp) { } } } - - diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5c79b379ed..6a47d884fa 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -310,8 +310,8 @@ void transRefCliHandle(void* handle); int32_t transUnrefCliHandle(void* handle); int32_t transGetRefCount(void* handle); -int32_t transReleaseCliHandle(void* handle); -int32_t transReleaseSrvHandle(void* handle); +int32_t transReleaseCliHandle(void* handle, int32_t status); +int32_t transReleaseSrvHandle(void* handle, int32_t status); int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx); int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp); diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index 4c969003a9..8881bd5e7f 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -69,7 +69,9 @@ void tmsgSendRsp(SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); } -void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); } +void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type, int32_t status) { + (*defaultMsgCb.releaseHandleFp)(pHandle, type, status); +} void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index de129773a0..9042c0adae 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -23,7 +23,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL}; -int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; +int (*transReleaseHandle[])(void* handle, int32_t status) = {transReleaseSrvHandle, transReleaseCliHandle}; static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { int32_t code = taosGetIpv4FromFqdn(localFqdn, ip); @@ -217,7 +217,9 @@ void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); } void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } -int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } +int32_t rpcReleaseHandle(void* handle, int8_t type, int32_t status) { + return (*transReleaseHandle[type])(handle, status); +} // client only int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 340cea1216..3ad8dcf271 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -555,9 +555,40 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->info.handle = (void*)qid; return 0; } + +int8_t cliMayNotifyUserOnRecvReleaseExcept(SCliConn* conn, STransMsgHead* pHead, SCliReq* pReq) { + int32_t code = 0; + if (pHead->code == 0 || pHead->msgType != TDMT_SCH_TASK_RELEASE) { + return 0; + } + // no ahandle, no need to notify user + if (pReq == NULL || pReq->ctx == NULL || pReq->ctx->ahandle == NULL) { + return 0; + } + + SCliThrd* pThrd = conn->hostThrd; + STransMsg resp = {.code = pHead->code}; + int64_t qId = taosHton64(pHead->qid); + STraceId* trace = &pHead->traceId; + int64_t seqNum = taosHton64(pHead->seqNum); + code = cliBuildExceptResp(pThrd, pReq, &resp); + if (code != 0) { + tGWarn("%s conn %p failed to build except resp for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, + tstrerror(code)); + } + code = cliNotifyCb(conn, NULL, &resp); + if (code != 0) { + tGWarn("%s conn %p failed to notify user for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId, + tstrerror(code)); + } + + destroyReq(pReq); + return 1; +} int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; + int8_t notifyUser = 0; if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; @@ -597,7 +628,11 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead removeReqFromSendQ(pReq); STraceId* trace = &pReq->msg.info.traceId; tGDebug("start to free msg %p", pReq); - destroyReqWrapper(pReq, pThrd); + + if (cliMayNotifyUserOnRecvReleaseExcept(conn, pHead, pReq)) { + } else { + destroyReqWrapper(pReq, pThrd); + } } taosMemoryFree(pHead); return 1; @@ -1259,7 +1294,8 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { - tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); + tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, + conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -1785,9 +1821,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMs STrans* pInst = pThrd->pInst; - SReqCtx* pCtx = pReq ? pReq->ctx : NULL; - STransMsg resp = {0}; - // resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL); + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; pResp->msgType = pReq ? pReq->msg.msgType + 1 : 0; pResp->info.cliVer = pInst->compatibilityVer; pResp->info.ahandle = pCtx ? pCtx->ahandle : 0; @@ -3103,15 +3137,18 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); return pThrd; } -int32_t transReleaseCliHandle(void* handle) { +int32_t transReleaseCliHandle(void* handle, int32_t status) { int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { return TSDB_CODE_RPC_BROKEN_LINK; } - STransMsg tmsg = { - .msgType = TDMT_SCH_TASK_RELEASE, .info.handle = handle, .info.ahandle = (void*)0, .info.qId = (int64_t)handle}; + STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, + .info.handle = handle, + .info.ahandle = (void*)0, + .info.qId = (int64_t)handle, + code = status}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d02bfb8281..e2a90ac25b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1858,7 +1858,7 @@ void transUnrefSrvHandle(void* handle) { } } -int32_t transReleaseSrvHandle(void* handle) { +int32_t transReleaseSrvHandle(void* handle, int32_t status) { int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; @@ -1871,7 +1871,7 @@ int32_t transReleaseSrvHandle(void* handle) { ASYNC_ERR_JRET(pThrd); STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, - .code = 0, + .code = status, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId, diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index 8e396d59d7..a50ff47253 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -96,7 +96,7 @@ class Client { } void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { if (req->info.handle != NULL) { - rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); + rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT, 0); req->info.handle = NULL; } SendAndRecv(req, resp); @@ -191,7 +191,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); + rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER, 0); } static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { { @@ -366,7 +366,7 @@ TEST_F(TransEnv, cliPersistHandle) { //} handle = resp.info.handle; } - rpcReleaseHandle(handle, TAOS_CONN_CLIENT); + rpcReleaseHandle(handle, TAOS_CONN_CLIENT, 0); for (int i = 0; i < 10; i++) { SRpcMsg req = {0}; req.msgType = 1; diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp index 54d23b1f64..d6b336b014 100644 --- a/source/libs/transport/test/transUT2.cpp +++ b/source/libs/transport/test/transUT2.cpp @@ -106,7 +106,7 @@ class Client { } void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { if (req->info.handle != NULL) { - rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); + rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT, 0); req->info.handle = NULL; } SendAndRecv(req, resp); @@ -201,7 +201,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); + rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER, 0); } static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { // { From ba2a2c75cb5b9955c7311dad57d5f307b075f55d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 29 Nov 2024 15:09:24 +0800 Subject: [PATCH 2/2] update interface --- source/libs/transport/src/transCli.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3ad8dcf271..f26106cf5b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -570,7 +570,6 @@ int8_t cliMayNotifyUserOnRecvReleaseExcept(SCliConn* conn, STransMsgHead* pHead, STransMsg resp = {.code = pHead->code}; int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; - int64_t seqNum = taosHton64(pHead->seqNum); code = cliBuildExceptResp(pThrd, pReq, &resp); if (code != 0) { tGWarn("%s conn %p failed to build except resp for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,