fix unit test
This commit is contained in:
parent
7d81d4f18e
commit
51f3ff3207
|
@ -454,6 +454,7 @@ void cliResetConnTimer(SCliConn* conn) {
|
||||||
conn->timer = NULL;
|
conn->timer = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
|
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
|
||||||
|
|
||||||
SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) {
|
SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) {
|
||||||
|
@ -479,49 +480,6 @@ bool cliShouldAddConnToPool(SCliConn* conn) {
|
||||||
|
|
||||||
return empty;
|
return empty;
|
||||||
}
|
}
|
||||||
void cliHandleResp_shareConn(SCliConn* conn) {
|
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
|
||||||
STrans* pInst = pThrd->pInst;
|
|
||||||
// cliResetConnTimer(conn);
|
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
|
||||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 1);
|
|
||||||
|
|
||||||
if (msgLen <= 0) {
|
|
||||||
taosMemoryFree(pHead);
|
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
|
||||||
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
}
|
|
||||||
pHead->code = htonl(pHead->code);
|
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
|
||||||
|
|
||||||
STransMsg resp = {0};
|
|
||||||
resp.contLen = transContLenFromMsg(pHead->msgLen);
|
|
||||||
resp.pCont = transContFromHead((char*)pHead);
|
|
||||||
resp.code = pHead->code;
|
|
||||||
resp.msgType = pHead->msgType;
|
|
||||||
resp.info.ahandle = NULL;
|
|
||||||
resp.info.traceId = pHead->traceId;
|
|
||||||
resp.info.hasEpSet = pHead->hasEpSet;
|
|
||||||
resp.info.cliVer = htonl(pHead->compatibilityVer);
|
|
||||||
|
|
||||||
SCliReq* pReq = cliFindReqBySeq(conn, pHead->seqNum);
|
|
||||||
pReq->seq = 0;
|
|
||||||
|
|
||||||
SReqCtx* pCtx = pReq->ctx;
|
|
||||||
resp.info.ahandle = pCtx ? pCtx->ahandle : NULL;
|
|
||||||
STraceId* trace = &resp.info.traceId;
|
|
||||||
|
|
||||||
int32_t ret = cliNotifyCb(conn, pReq, &resp);
|
|
||||||
if (ret != 0) {
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
destroyReq(pReq);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
|
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -536,27 +494,36 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
|
||||||
return TSDB_CODE_OUT_OF_RANGE;
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cliMayRecycleConn(SCliConn* conn) {
|
int8_t cliMayRecycleConn(SCliConn* conn) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
if (transQueueSize(&conn->reqs) == 0) {
|
if (transQueueSize(&conn->reqs) == 0) {
|
||||||
addConnToPool(pThrd->pool, conn);
|
addConnToPool(pThrd->pool, conn);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) {
|
||||||
|
pResp->contLen = transContLenFromMsg(pHead->msgLen);
|
||||||
|
pResp->pCont = transContFromHead((char*)pHead);
|
||||||
|
pResp->code = pReq->msg.code;
|
||||||
|
pResp->msgType = pReq->msg.msgType;
|
||||||
|
pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
|
||||||
|
pResp->info.traceId = pReq->msg.info.traceId;
|
||||||
|
pResp->info.hasEpSet = pReq->msg.info.hasEpSet;
|
||||||
|
pResp->info.cliVer = pReq->msg.info.cliVer;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
void cliHandleResp2(SCliConn* conn) {
|
void cliHandleResp2(SCliConn* conn) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
cliResetConnTimer(conn);
|
cliResetConnTimer(conn);
|
||||||
if (pInst->shareConn) {
|
|
||||||
return cliHandleResp_shareConn(conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
|
||||||
|
|
||||||
// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
|
// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
|
||||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
|
STransMsgHead* pHead = NULL;
|
||||||
|
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
|
||||||
if (msgLen < 0) {
|
if (msgLen < 0) {
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
@ -588,38 +555,31 @@ void cliHandleResp2(SCliConn* conn) {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
STransMsg resp = {0};
|
STransMsg resp = {0};
|
||||||
resp.contLen = transContLenFromMsg(pHead->msgLen);
|
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
||||||
resp.pCont = transContFromHead((char*)pHead);
|
|
||||||
resp.code = pHead->code;
|
|
||||||
resp.msgType = pHead->msgType;
|
|
||||||
resp.info.ahandle = NULL;
|
|
||||||
resp.info.traceId = pHead->traceId;
|
|
||||||
resp.info.hasEpSet = pHead->hasEpSet;
|
|
||||||
resp.info.cliVer = htonl(pHead->compatibilityVer);
|
|
||||||
resp.info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
|
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
|
tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cliNotifyCb(conn, pReq, &resp);
|
code = cliNotifyCb(conn, pReq, &resp);
|
||||||
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
|
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
|
||||||
STraceId* trace = &resp.info.traceId;
|
STraceId* trace = &resp.info.traceId;
|
||||||
tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn));
|
tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn));
|
||||||
// retry, notify
|
// retry, notify
|
||||||
} else {
|
} else {
|
||||||
|
destroyReq(pReq);
|
||||||
}
|
}
|
||||||
(void)cliMayRecycleConn(conn);
|
if (cliMayRecycleConn(conn)) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliHandleResp(SCliConn* conn) {
|
void cliHandleResp(SCliConn* conn) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
cliResetConnTimer(conn);
|
cliResetConnTimer(conn);
|
||||||
// if (pInst->shareConn) {
|
|
||||||
// return cliHandleResp_shareConn(conn);
|
|
||||||
// }
|
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
|
@ -1686,7 +1646,6 @@ int32_t cliSend(SCliConn* pConn) {
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
SCliReq* pCliReq = NULL;
|
SCliReq* pCliReq = NULL;
|
||||||
int32_t code = cliConnFindToSendMsg(pConn, &pCliReq);
|
int32_t code = cliConnFindToSendMsg(pConn, &pCliReq);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1699,7 +1658,6 @@ int32_t cliSend(SCliConn* pConn) {
|
||||||
if (pReq->pCont == NULL) {
|
if (pReq->pCont == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("malloc memory: %p", pReq->pCont);
|
tDebug("malloc memory: %p", pReq->pCont);
|
||||||
pReq->contLen = 0;
|
pReq->contLen = 0;
|
||||||
}
|
}
|
||||||
|
@ -1719,12 +1677,10 @@ int32_t cliSend(SCliConn* pConn) {
|
||||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
pHead->version = TRANS_VER;
|
pHead->version = TRANS_VER;
|
||||||
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
|
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
|
||||||
|
pHead->seqNum = pConn->seq++;
|
||||||
}
|
}
|
||||||
pHead->timestamp = taosHton64(taosGetTimestampUs());
|
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
pHead->timestamp = taosHton64(taosGetTimestampUs());
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
|
||||||
}
|
|
||||||
|
|
||||||
STraceId* trace = &pReq->info.traceId;
|
STraceId* trace = &pReq->info.traceId;
|
||||||
|
|
||||||
|
@ -1741,24 +1697,22 @@ int32_t cliSend(SCliConn* pConn) {
|
||||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
|
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
uv_write_t* aReq = transReqQueuePush(&pConn->wreqQueue);
|
||||||
if (req == NULL) {
|
if (aReq == NULL) {
|
||||||
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
cliHandleExcept(pConn, -1);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCliReq->sent = 1;
|
pCliReq->sent = 1;
|
||||||
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
int status = uv_write(aReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
|
||||||
uv_err_name(status));
|
uv_err_name(status));
|
||||||
cliHandleExcept(pConn, -1);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _exception);
|
||||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
|
_exception:
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cliDestroyBatch(SCliBatch* pBatch) {
|
static void cliDestroyBatch(SCliBatch* pBatch) {
|
||||||
|
|
Loading…
Reference in New Issue