fix random_err crash

This commit is contained in:
yihaoDeng 2024-08-22 19:32:20 +08:00
parent 9201315594
commit 2982a8ae51
2 changed files with 79 additions and 48 deletions

View File

@ -696,6 +696,11 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
plist = taosHashGet(pool, key, klen); plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
if (nList == NULL) {
doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pMsg = NULL;
return NULL;
}
QUEUE_INIT(&nList->msgQ); QUEUE_INIT(&nList->msgQ);
nList->numOfConn++; nList->numOfConn++;
@ -817,7 +822,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false); cliDestroyConnMsgs(conn, false);
if (conn->list == NULL) { if (conn->list == NULL && conn->dstAddr != NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
} }
@ -962,6 +967,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
int32_t code = 0; int32_t code = 0;
int8_t registed = 0;
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
if (conn == NULL) { if (conn == NULL) {
@ -981,8 +987,24 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
code = TSDB_CODE_THIRDPARTY_ERROR; code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _failed); TAOS_CHECK_GOTO(code, NULL, _failed);
} }
registed = 1;
conn->stream->data = conn; conn->stream->data = conn;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = false;
TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
transRefCliHandle(conn);
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
@ -996,18 +1018,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
timer->data = conn; timer->data = conn;
conn->timer = timer; conn->timer = timer;
conn->connReq.data = conn;
transReqQueueInit(&conn->wreqQueue);
TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = false;
transRefCliHandle(conn);
(void)atomic_add_fetch_32(&pThrd->connCount, 1); (void)atomic_add_fetch_32(&pThrd->connCount, 1);
@ -1016,12 +1026,16 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
*pCliConn = conn; *pCliConn = conn;
return code; return code;
_failed: _failed:
if (conn) { if (registed == 1) {
taosMemoryFree(conn->stream); uv_close((uv_handle_t*)conn->stream, cliDestroy);
(void)transDestroyBuffer(&conn->readBuf); } else {
transQueueDestroy(&conn->cliMsgs); if (conn) {
taosMemoryFree(conn->stream);
(void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->cliMsgs);
}
taosMemoryFree(conn);
} }
taosMemoryFree(conn);
return code; return code;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
@ -1032,7 +1046,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
conn->broken = true; conn->broken = true;
if (conn->list == NULL) { if (conn->list == NULL && conn->dstAddr) {
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
} }
@ -1287,19 +1301,19 @@ void cliSend(SCliConn* pConn) {
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { // if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; // uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) :
if (timer == NULL) { // NULL; if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); // timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
tDebug("no available timer, create a timer %p", timer); // tDebug("no available timer, create a timer %p", timer);
(void)uv_timer_init(pThrd->loop, timer); // (void)uv_timer_init(pThrd->loop, timer);
} // }
timer->data = pConn; // timer->data = pConn;
pConn->timer = timer; // pConn->timer = timer;
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); // tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
(void)uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); // (void)uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
} // }
if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
@ -1385,7 +1399,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (conn->dstAddr == NULL) { if (conn->dstAddr == NULL) {
tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn,
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
cliHandleFastFail(conn, -1); uv_close((uv_handle_t*)conn->stream, cliDestroy);
return; return;
} }
@ -1727,8 +1741,6 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
// memset(pResp, 0, sizeof(STransMsg));
if (pResp->code == 0) { if (pResp->code == 0) {
pResp->code = TSDB_CODE_RPC_BROKEN_LINK; pResp->code = TSDB_CODE_RPC_BROKEN_LINK;
} }
@ -1869,6 +1881,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
(void)transQueuePush(&conn->cliMsgs, pMsg); (void)transQueuePush(&conn->cliMsgs, pMsg);
conn->dstAddr = taosStrdup(addr); conn->dstAddr = taosStrdup(addr);
if (conn->dstAddr == NULL) {
tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
cliHandleFastFail(conn, -1);
return;
}
uint32_t ipaddr; uint32_t ipaddr;
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
@ -2564,16 +2582,25 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
return; return;
} }
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst));
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
(void)transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); SDelayTask* pTask = transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
if (pTask == NULL) {
taosMemoryFree(arg);
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
} }
FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
@ -2748,7 +2775,12 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} }
pMsg->sent = 0; pMsg->sent = 0;
cliSchedMsgToNextNode(pMsg, pThrd); code = cliSchedMsgToNextNode(pMsg, pThrd);
if (code != 0) {
pResp->code = code;
tError("failed to sched msg to next node, reason:%s", tstrerror(code));
return false;
}
return true; return true;
} }
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {

View File

@ -1182,22 +1182,22 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
int32_t code = 0; int32_t code = 0;
SWorkThrd* pThrd = hThrd; SWorkThrd* pThrd = hThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int32_t lino;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
if (pConn == NULL) { if (pConn == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
} }
transReqQueueInit(&pConn->wreqQueue); transReqQueueInit(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end); TAOS_CHECK_GOTO(code, &lino, _end);
} }
if ((code = transInitBuffer(&pConn->readBuf)) != 0) { if ((code = transInitBuffer(&pConn->readBuf)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end); TAOS_CHECK_GOTO(code, &lino, _end);
} }
memset(&pConn->regArg, 0, sizeof(pConn->regArg)); memset(&pConn->regArg, 0, sizeof(pConn->regArg));
@ -1206,14 +1206,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
if (exh == NULL) { if (exh == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
} }
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
if (exh->refId < 0) { if (exh->refId < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, &lino, _end);
} }
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
@ -1233,15 +1233,16 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
// init client handle // init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
if (pConn->pTcp == NULL) { if (pConn->pTcp == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
} }
code = uv_tcp_init(pThrd->loop, pConn->pTcp); code = uv_tcp_init(pThrd->loop, pConn->pTcp);
if (code != 0) { if (code != 0) {
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _end);
} }
pConn->pTcp->data = pConn; pConn->pTcp->data = pConn;
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
return pConn; return pConn;
_end: _end:
@ -1252,7 +1253,7 @@ _end:
taosMemoryFree(pConn); taosMemoryFree(pConn);
pConn = NULL; pConn = NULL;
} }
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code)); tError("%s failed to create conn since %s, lino:%d" PRId64, transLabel(pTransInst), tstrerror(code), lino);
return NULL; return NULL;
} }
@ -1895,5 +1896,3 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
} }
return code; return code;
} }
int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }