diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 534f7d0505..5b42716c03 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -94,12 +94,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029) - - - - - - +#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A) //common & util #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4e7515b12c..04a90d7765 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -212,7 +212,7 @@ static FORCE_INLINE void cliAllocRecvBufferCb(uv_handle_t* handle, size_t sugges // callback after recv nbytes from socket static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after send data to socket -static void cliSendCb(uv_write_t* req, int status); +// static void cliSendCb(uv_write_t* req, int status); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); @@ -620,16 +620,16 @@ void cliConnTimeout(uv_timer_t* handle) { tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, UV_ECANCELED); -} -void cliReadTimeoutCb(uv_timer_t* handle) { - // set up timeout cb - SCliConn* conn = handle->data; - tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); - (void)uv_read_stop(conn->stream); - cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); + // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + // cliHandleFastFail(conn, UV_ECANCELED); } +// void cliReadTimeoutCb(uv_timer_t* handle) { +// // set up timeout cb +// SCliConn* conn = handle->data; +// tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); +// (void)uv_read_stop(conn->stream); +// cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); +// } void* createConnPool(int size) { // thread local, no lock @@ -1070,45 +1070,6 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } -// static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { -// if (transQueuePush(&conn->reqsToSend, pReq) != 0) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// return 0; -// } - -// static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { -// // do nothing -// SCliReq* pTail = transQueuePop(&conn->reqsToSend); -// if (pTail == NULL) { -// return TSDB_CODE_INVALID_PARA; -// } -// if (pReq != NULL) { -// *pReq = pTail; -// } -// return 0; -// } -// static int32_t cliPutQReqToTable(SCliConn* pConn, SCliReq* pReq) { -// int32_t code = 0; -// if (pReq->msg.info.handle == 0) { -// return 0; -// } - -// queue q; -// QUEUE_INIT(&q); - -// queue* p = taosHashGet(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t)); -// if (p == NULL) { -// QUEUE_PUSH(&q, &pReq->qlist); -// code = taosHashPut(pConn->pQueryTable, (void*)pReq->msg.info.handle, sizeof(int64_t), &q, sizeof(queue)); -// if (code != 0) { -// return code; -// } -// } else { -// QUEUE_PUSH(p, &pReq->qlist); -// } -// return 0; -// } static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { int32_t code = 0; SCliConn* pConn = NULL; @@ -1291,37 +1252,44 @@ static void cliDestroy(uv_handle_t* handle) { taosMemoryFree(conn); } +bool filterAllReq(void* e, void* arg) { return 1; } + static void cliHandleBatch_shareConnExcept(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; - while (!transQueueEmpty(&conn->reqsToSend)) { - queue* el = transQueuePop(&conn->reqsToSend); + + queue set; + transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1); + transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1); + + while (!QUEUE_IS_EMPTY(&set)) { + queue* el = QUEUE_HEAD(&set); + QUEUE_REMOVE(el); + SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - // ASSERT(pReq->type != Release); - // ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); - - SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; STransMsg resp = {0}; - resp.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; + resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL); resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; - resp.info.ahandle = NULL; resp.info.cliVer = pInst->compatibilityVer; - resp.info.ahandle = pCtx->ahandle; + resp.info.ahandle = pCtx ? pCtx->ahandle : 0; + // not + if (pCtx == NULL) { + destroyReq(pReq); + continue; + } pReq->seq = 0; code = cliNotifyCb(conn, pReq, &resp); - if (code != 0) { + if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { continue; } else { // already notify user destroyReq(pReq); } } - - if (TRANS_CONN_REF_GET(conn) > 1) transUnrefCliHandle(conn); - transUnrefCliHandle(conn); } bool fileToRmReq(void* h, void* arg) { @@ -1611,6 +1579,23 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) { return 0; }; + +static int32_t cliBuildExeceptMsg(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + memset(pResp, 0, sizeof(STransMsg)); + STransMsg resp = {0}; + resp.contLen = 0; + resp.pCont = NULL; + resp.msgType = pReq->msg.msgType + 1; + resp.info.ahandle = pReq->ctx->ahandle; + resp.info.traceId = pReq->msg.info.traceId; + resp.info.hasEpSet = false; + resp.info.cliVer = pInst->compatibilityVer; + return 0; +} + +bool filteGetAll(void* q, void* arg) { return true; } void cliConnCb(uv_connect_t* req, int status) { SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -1627,16 +1612,16 @@ void cliConnCb(uv_connect_t* req, int status) { if (status != 0) { tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, uv_strerror(status)); - // handle err - // 1. update statis - // 2. notifyCb or retry - // 3. clear conn and - // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); - // if (timeout == false) { - // cliHandleFastFail(pConn, status); - // } else if (timeout == true) { - // // already deal by timeout + // queue set; + // transQueueRemoveByFilter(&pConn->reqsToSend, filteGetAll, NULL, &set, -1); + // transQueueRemoveByFilter(&pConn->reqsSentOut, filteGetAll, NULL, &set, -1); + // while (!QUEUE_IS_EMPTY(&set)) { + // queue* el = QUEUE_HEAD(&set); + // SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + // cliBuildExeceptMsg(pConn, pReq, &pReq->msg); + // destroyReq(pReq); // } + cliHandleExcept(pConn, status); return; } pConn->connnected = 1; @@ -1894,9 +1879,12 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { } SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); + if (pState == NULL) { + if (pReq->ctx == NULL || pReq->ctx->ahandle == 0) { + return TSDB_CODE_RPC_STATE_DROPED; + } tDebug("failed to get statue, qid:%ld", qid); - // ASSERT(0); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; @@ -1939,6 +1927,11 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { code = cliMayGetStateByQid(pThrd, pReq, &pConn); if (code == 0) { (void)clConnMayUpdateReqCtx(pConn, pReq); + } else if (code == TSDB_CODE_RPC_STATE_DROPED) { + STraceId* trace = &pReq->msg.info.traceId; + tWarn("%s failed to get statue, qid:%ld", pInst->label, pReq->msg.info.qId); + destroyReq(pReq); + return; } if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { @@ -2217,6 +2210,7 @@ static FORCE_INLINE void destroyReq(void* arg) { if (pReq == NULL) { return; } + tDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); if (pReq->ctx) destroyReqCtx(pReq->ctx); @@ -2802,7 +2796,6 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); transFreeMsg(pResp->pCont); - // transUnrefCliHandle(pConn); } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || @@ -2810,16 +2803,13 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); - // addConnToPool(pThrd->pool, pConn); } else if (code == TSDB_CODE_SYN_RESTORING) { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); - // addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } else { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); - // addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8cec54ee68..78cee5bb77 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -61,6 +61,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already qu TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")