update interface

This commit is contained in:
yihaoDeng 2024-11-29 14:56:29 +08:00
parent 90b60d9d01
commit f7cb4b9b09
14 changed files with 204 additions and 155 deletions

View File

@ -47,7 +47,7 @@ typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type, int32_t status);
typedef void (*ReportStartup)(const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);
typedef struct { typedef struct {
@ -76,7 +76,7 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg); int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type, int32_t code);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
void tmsgUpdateDnodeEpSet(SEpSet* epset); void tmsgUpdateDnodeEpSet(SEpSet* epset);

View File

@ -171,7 +171,7 @@ void *rpcReallocCont(void *ptr, int64_t contLen);
int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); int32_t rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
int32_t rpcSendResponse(const SRpcMsg *pMsg); int32_t rpcSendResponse(const SRpcMsg *pMsg);
int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg); int32_t rpcRegisterBrokenLinkArg(SRpcMsg *msg);
int32_t rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock int32_t rpcReleaseHandle(void *handle, int8_t type, int32_t code); // just release conn to rpc instance, no close sock
// These functions will not be called in the child process // These functions will not be called in the child process
int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int32_t rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);

View File

@ -368,7 +368,9 @@ static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); } static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { (void)rpcReleaseHandle(pHandle, type); } static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type, int32_t status) {
(void)rpcReleaseHandle(pHandle, type, status);
}
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND || if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||

View File

@ -312,7 +312,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (ctx->ctrlConnInfo.handle) { if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, 0);
} }
ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.handle = NULL;
@ -336,7 +336,6 @@ static void freeExplainExecItem(void *param) {
taosMemoryFree(pInfo->verboseInfo); taosMemoryFree(pInfo->verboseInfo);
} }
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
@ -387,8 +386,6 @@ _return:
return code; return code;
} }
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, cId, tId, eId); QW_SET_QTID(id, qId, cId, tId, eId);
@ -477,7 +474,6 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, cId, tId, eId); QW_SET_QTID(id, qId, cId, tId, eId);
@ -547,7 +543,8 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
while (true) { while (true) {
tbGet = false; tbGet = false;
code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, TSDB_DB_FNAME_LEN, tbName, TSDB_TABLE_NAME_LEN, &tbInfo.sversion, &tbInfo.tversion, i, &tbGet); code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, TSDB_DB_FNAME_LEN, tbName, TSDB_TABLE_NAME_LEN,
&tbInfo.sversion, &tbInfo.tversion, i, &tbGet);
if (TSDB_CODE_SUCCESS != code || !tbGet) { if (TSDB_CODE_SUCCESS != code || !tbGet) {
break; break;
} }
@ -805,7 +802,8 @@ bool qwRetireJob(SQWJobInfo* pJob) {
while (pIter) { while (pIter) {
SQWSessionInfo *pSession = (SQWSessionInfo *)pIter; SQWSessionInfo *pSession = (SQWSessionInfo *)pIter;
if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) { if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId,
pSession->rId, pSession->eId)) {
retired = false; retired = false;
} }
@ -814,4 +812,3 @@ bool qwRetireJob(SQWJobInfo* pJob) {
return retired; return retired;
} }

View File

@ -21,7 +21,6 @@ SQWorkerMgmt gQwMgmt = {
TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
SQueryMgmt gQueryMgmt = {0}; SQueryMgmt gQueryMgmt = {0};
void qwStopAllTasks(SQWorker *mgmt) { void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, cId, tId, sId; uint64_t qId, cId, tId, sId;
int32_t eId; int32_t eId;
@ -53,7 +52,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
sch->hbBrokenTs = taosGetTimestampMs(); sch->hbBrokenTs = taosGetTimestampMs();
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER, 0);
sch->hbConnInfo.handle = NULL; sch->hbConnInfo.handle = NULL;
sch->hbConnInfo.ahandle = NULL; sch->hbConnInfo.ahandle = NULL;
@ -716,7 +715,8 @@ _return:
} }
if (code) { if (code) {
(void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // already in error, ignore new error (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL,
ctx->dynamicTask); // already in error, ignore new error
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -795,7 +795,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
} }
taosEnableMemPoolUsage(ctx->memPoolSession); taosEnableMemPoolUsage(ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql,
OPTR_EXEC_MODEL_BATCH);
taosDisableMemPoolUsage(); taosDisableMemPoolUsage();
if (code) { if (code) {
@ -915,11 +916,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_ELOG("fetch rsp send fail, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
0); tstrerror(code), 0);
} else { } else {
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
0); tstrerror(code), 0);
} }
} }
@ -1024,8 +1025,8 @@ _return:
if (!rsped) { if (!rsped) {
code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); code = qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_ELOG("fetch rsp send fail, msgType:%s, handle:%p, code:%x - %s, dataLen:%d",
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else { } else {
QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1),
qwMsg->connInfo.handle, code, tstrerror(code), dataLen); qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
@ -1076,9 +1077,10 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
(void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL,
ctx->dynamicTask); // task already failed, no more error handling
} else { } else {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0);
} }
} }
@ -1088,7 +1090,7 @@ _return:
if (ctx) { if (ctx) {
if (qwMsg->connInfo.handle != ctx->ctrlConnInfo.handle) { if (qwMsg->connInfo.handle != ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0);
} }
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
@ -1130,7 +1132,8 @@ _return:
if (code) { if (code) {
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
(void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); // task already failed, no more error handling (void)qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL,
ctx->dynamicTask); // task already failed, no more error handling
} }
} }
@ -1162,7 +1165,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER, 0);
sch->hbConnInfo.handle = NULL; sch->hbConnInfo.handle = NULL;
} }
@ -1171,8 +1174,9 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_UNLOCK(QW_WRITE, &sch->hbConnLock); QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->clientId, QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); req->clientId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle,
qwMsg->connInfo.ahandle);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
@ -1182,7 +1186,7 @@ _return:
code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER, 0);
qwMsg->connInfo.handle = NULL; qwMsg->connInfo.handle = NULL;
} }
@ -1622,7 +1626,6 @@ _return:
QW_RET(code); QW_RET(code);
} }
void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
char id[sizeof(jobId) + sizeof(clientId) + 1] = {0}; char id[sizeof(jobId) + sizeof(clientId) + 1] = {0};
QW_SET_QCID(id, jobId, clientId); QW_SET_QCID(id, jobId, clientId);
@ -1633,13 +1636,15 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
return; return;
} }
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) &&
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId,
errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
(void)qwRetireJob(pJob); (void)qwRetireJob(pJob);
} else { } else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId,
atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
} }
} }
@ -1653,7 +1658,8 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
continue; continue;
} }
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) &&
0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
bool retired = qwRetireJob(pJob); bool retired = qwRetireJob(pJob);
if (retired) { if (retired) {
@ -1662,14 +1668,14 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
jobNum++; jobNum++;
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64, qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64
", retireSize:%" PRId64,
pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize); pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize);
} }
pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob); pJob = (SQWJobInfo *)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
} }
qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize,
retireSize);
} }

View File

@ -509,7 +509,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId,
pParam->clientId, pParam->taskId, code); pParam->clientId, pParam->taskId, code);
// called if drop task rsp received code // called if drop task rsp received code
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
if (pMsg->handle == NULL) { if (pMsg->handle == NULL) {
qError("sch handle is NULL, may be already released and mem lea"); qError("sch handle is NULL, may be already released and mem lea");
@ -535,7 +535,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
qDebug("handle %p is broken", pMsg->handle); qDebug("handle %p is broken", pMsg->handle);
@ -565,7 +565,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
if (code) { if (code) {
qError("hb rsp error:%s", tstrerror(code)); qError("hb rsp error:%s", tstrerror(code));
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }

View File

@ -89,7 +89,7 @@ char *schGetOpStr(SCH_OP_TYPE type) {
} }
void schFreeHbTrans(SSchHbTrans *pTrans) { void schFreeHbTrans(SSchHbTrans *pTrans) {
(void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT); (void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT, 0);
schFreeRpcCtx(&pTrans->rpcCtx); schFreeRpcCtx(&pTrans->rpcCtx);
} }
@ -202,7 +202,8 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) { if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx,
(int32_t)taosArrayGetSize(pTask->candidateAddrs));
return; return;
} }
@ -240,7 +241,8 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) { if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx,
(int32_t)taosArrayGetSize(pTask->candidateAddrs));
return TSDB_CODE_SCH_INTERNAL_ERROR; return TSDB_CODE_SCH_INTERNAL_ERROR;
} }
@ -276,8 +278,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", epId->nodeId, epId->ep.fqdn,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -323,7 +325,8 @@ uint64_t schGenUUID(void) {
uint64_t pid = taosGetPId(); uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1); int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((uint64_t)((hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); uint64_t id =
((uint64_t)((hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id; return id;
} }
#endif #endif
@ -396,11 +399,13 @@ int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int
if (SCH_IS_DATA_BIND_PLAN(pSubplan)) { if (SCH_IS_DATA_BIND_PLAN(pSubplan)) {
if (pSubplan->execNode.epSet.numOfEps <= 0) { if (pSubplan->execNode.epSet.numOfEps <= 0) {
SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType,
pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS); SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS);
} }
if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) { if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) {
SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse,
pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
} }
@ -429,5 +434,3 @@ void schStopTaskDelayTimer(SSchJob *pJob, SSchTask* pTask, bool syncOp) {
} }
} }
} }

View File

@ -310,8 +310,8 @@ void transRefCliHandle(void* handle);
int32_t transUnrefCliHandle(void* handle); int32_t transUnrefCliHandle(void* handle);
int32_t transGetRefCount(void* handle); int32_t transGetRefCount(void* handle);
int32_t transReleaseCliHandle(void* handle); int32_t transReleaseCliHandle(void* handle, int32_t status);
int32_t transReleaseSrvHandle(void* handle); int32_t transReleaseSrvHandle(void* handle, int32_t status);
int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx); int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx);
int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp); int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp);

View File

@ -69,7 +69,9 @@ void tmsgSendRsp(SRpcMsg* pMsg) {
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); } void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); }
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); } void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type, int32_t status) {
(*defaultMsgCb.releaseHandleFp)(pHandle, type, status);
}
void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); } void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }

View File

@ -23,7 +23,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL};
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; int (*transReleaseHandle[])(void* handle, int32_t status) = {transReleaseSrvHandle, transReleaseCliHandle};
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
int32_t code = taosGetIpv4FromFqdn(localFqdn, ip); int32_t code = taosGetIpv4FromFqdn(localFqdn, ip);
@ -217,7 +217,9 @@ void rpcRefHandle(void* handle, int8_t type) { (*taosRefHandle[type])(handle); }
void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); }
int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } int32_t rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int32_t rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } int32_t rpcReleaseHandle(void* handle, int8_t type, int32_t status) {
return (*transReleaseHandle[type])(handle, status);
}
// client only // client only
int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { int32_t rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {

View File

@ -555,9 +555,40 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.handle = (void*)qid; pResp->info.handle = (void*)qid;
return 0; return 0;
} }
int8_t cliMayNotifyUserOnRecvReleaseExcept(SCliConn* conn, STransMsgHead* pHead, SCliReq* pReq) {
int32_t code = 0;
if (pHead->code == 0 || pHead->msgType != TDMT_SCH_TASK_RELEASE) {
return 0;
}
// no ahandle, no need to notify user
if (pReq == NULL || pReq->ctx == NULL || pReq->ctx->ahandle == NULL) {
return 0;
}
SCliThrd* pThrd = conn->hostThrd;
STransMsg resp = {.code = pHead->code};
int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId;
int64_t seqNum = taosHton64(pHead->seqNum);
code = cliBuildExceptResp(pThrd, pReq, &resp);
if (code != 0) {
tGWarn("%s conn %p failed to build except resp for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
code = cliNotifyCb(conn, NULL, &resp);
if (code != 0) {
tGWarn("%s conn %p failed to notify user for req:%" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, qId,
tstrerror(code));
}
destroyReq(pReq);
return 1;
}
int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead) { int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
int8_t notifyUser = 0;
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
int64_t qId = taosHton64(pHead->qid); int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
@ -597,8 +628,12 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
removeReqFromSendQ(pReq); removeReqFromSendQ(pReq);
STraceId* trace = &pReq->msg.info.traceId; STraceId* trace = &pReq->msg.info.traceId;
tGDebug("start to free msg %p", pReq); tGDebug("start to free msg %p", pReq);
if (cliMayNotifyUserOnRecvReleaseExcept(conn, pHead, pReq)) {
} else {
destroyReqWrapper(pReq, pThrd); destroyReqWrapper(pReq, pThrd);
} }
}
taosMemoryFree(pHead); taosMemoryFree(pHead);
return 1; return 1;
} }
@ -1259,7 +1294,8 @@ static void cliHandleException(SCliConn* conn) {
if (conn->registered) { if (conn->registered) {
int8_t ref = transGetRefCount(conn); int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
} }
@ -1786,8 +1822,6 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMs
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq ? pReq->ctx : NULL; SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
// resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL);
pResp->msgType = pReq ? pReq->msg.msgType + 1 : 0; pResp->msgType = pReq ? pReq->msg.msgType + 1 : 0;
pResp->info.cliVer = pInst->compatibilityVer; pResp->info.cliVer = pInst->compatibilityVer;
pResp->info.ahandle = pCtx ? pCtx->ahandle : 0; pResp->info.ahandle = pCtx ? pCtx->ahandle : 0;
@ -3103,15 +3137,18 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle);
return pThrd; return pThrd;
} }
int32_t transReleaseCliHandle(void* handle) { int32_t transReleaseCliHandle(void* handle, int32_t status) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) { if (pThrd == NULL) {
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
STransMsg tmsg = { STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = handle, .info.ahandle = (void*)0, .info.qId = (int64_t)handle}; .info.handle = handle,
.info.ahandle = (void*)0,
.info.qId = (int64_t)handle,
code = status};
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());

View File

@ -1858,7 +1858,7 @@ void transUnrefSrvHandle(void* handle) {
} }
} }
int32_t transReleaseSrvHandle(void* handle) { int32_t transReleaseSrvHandle(void* handle, int32_t status) {
int32_t code = 0; int32_t code = 0;
SRpcHandleInfo* info = handle; SRpcHandleInfo* info = handle;
SExHandle* exh = info->handle; SExHandle* exh = info->handle;
@ -1871,7 +1871,7 @@ int32_t transReleaseSrvHandle(void* handle) {
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE,
.code = 0, .code = status,
.info.handle = exh, .info.handle = exh,
.info.ahandle = NULL, .info.ahandle = NULL,
.info.refId = refId, .info.refId = refId,

View File

@ -96,7 +96,7 @@ class Client {
} }
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->info.handle != NULL) { if (req->info.handle != NULL) {
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT, 0);
req->info.handle = NULL; req->info.handle = NULL;
} }
SendAndRecv(req, resp); SendAndRecv(req, resp);
@ -191,7 +191,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER, 0);
} }
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
{ {
@ -366,7 +366,7 @@ TEST_F(TransEnv, cliPersistHandle) {
//} //}
handle = resp.info.handle; handle = resp.info.handle;
} }
rpcReleaseHandle(handle, TAOS_CONN_CLIENT); rpcReleaseHandle(handle, TAOS_CONN_CLIENT, 0);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {0}; SRpcMsg req = {0};
req.msgType = 1; req.msgType = 1;

View File

@ -106,7 +106,7 @@ class Client {
} }
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->info.handle != NULL) { if (req->info.handle != NULL) {
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT, 0);
req->info.handle = NULL; req->info.handle = NULL;
} }
SendAndRecv(req, resp); SendAndRecv(req, resp);
@ -201,7 +201,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
rpcMsg.code = 0; rpcMsg.code = 0;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER, 0);
} }
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
// { // {