Merge pull request #14771 from taosdata/enh/rpcSvrRefactor

enh: rpc svr refactor
This commit is contained in:
Yihao Deng 2022-07-11 19:05:30 +08:00 committed by GitHub
commit b6bd0b5198
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 23 deletions

View File

@ -810,11 +810,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param; SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code; pRequest->code = code;
if (pResult) {
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult)); memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) { TDMT_VND_CREATE_TABLE == pRequest->type) {
if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows; pRequest->body.resInfo.numOfRows = pResult->numOfRows;
}
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
} }

View File

@ -472,6 +472,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;

View File

@ -763,6 +763,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
if (NULL == pReq || pReq->syncReq) {
return;
}
if (pReq->execFp) {
(*pReq->execFp)(NULL, pReq->cbParam, errCode);
} else if (pReq->fetchFp) {
(*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
}
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0; int32_t op = 0;
@ -801,17 +812,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = SCH_GET_JOB_STATUS(pJob);
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
@ -822,11 +829,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
case SCH_OP_FETCH: case SCH_OP_FETCH:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq; pJob->opStatus.syncReq = pReq->syncReq;
if (!SCH_JOB_NEED_FETCH(pJob)) { if (!SCH_JOB_NEED_FETCH(pJob)) {
@ -839,10 +851,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
break; break;
case SCH_OP_GET_STATUS: case SCH_OP_GET_STATUS:
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) { if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
@ -855,6 +863,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
int32_t code = errCode; int32_t code = errCode;
if (NULL == pJob) { if (NULL == pJob) {
schDirectPostJobRes(pReq, errCode);
SCH_RET(code); SCH_RET(code);
} }

View File

@ -573,8 +573,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
if (nread < 0) { if (nread < 0) {
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
T_REF_VAL_GET(conn));
conn->broken = true; conn->broken = true;
cliHandleExcept(conn); cliHandleExcept(conn);
} }
@ -650,7 +649,11 @@ static bool cliHandleNoResp(SCliConn* conn) {
return res; return res;
} }
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(req);
if (pConn == NULL) {
return;
}
if (status == 0) { if (status == 0) {
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
@ -708,8 +711,8 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
pConn->writeReq.data = pConn; uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
return; return;

View File

@ -265,8 +265,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn, pConn->refId); pConn->refId);
assert(transMsg.info.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
@ -331,7 +331,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSvrConn* conn = req->data; SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(req);
if (conn == NULL) return;
if (status == 0) { if (status == 0) {
tTrace("conn %p data already was written on stream", conn); tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
@ -390,7 +393,6 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
} else { } else {
@ -433,7 +435,9 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData(smsg, &wb); uvPrepareSendData(smsg, &wb);
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
// impl // impl