opt parameter
This commit is contained in:
parent
8823358ca7
commit
2f487130ce
|
@ -465,7 +465,9 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||||
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
|
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
|
||||||
int64_t qId = taosHton64(pHead->qid);
|
int64_t qId = taosHton64(pHead->qid);
|
||||||
STraceId* trace = &pHead->traceId;
|
STraceId* trace = &pHead->traceId;
|
||||||
tGDebug("%s conn %p receive release req, qid:%ld", CONN_GET_INST_LABEL(conn), conn, qId);
|
int32_t seqNum = htonl(pHead->seqNum);
|
||||||
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%ld", CONN_GET_INST_LABEL(conn),
|
||||||
|
conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId);
|
||||||
|
|
||||||
STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId));
|
STransCtx* p = taosHashGet(conn->pQTable, &qId, sizeof(qId));
|
||||||
transCtxCleanup(p);
|
transCtxCleanup(p);
|
||||||
|
@ -569,8 +571,8 @@ void cliHandleResp2(SCliConn* conn) {
|
||||||
qId, tstrerror(code));
|
qId, tstrerror(code));
|
||||||
}
|
}
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
|
tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
|
||||||
tstrerror(code));
|
qId, tstrerror(code));
|
||||||
// TODO: notify cb
|
// TODO: notify cb
|
||||||
if (cliMayRecycleConn(conn)) {
|
if (cliMayRecycleConn(conn)) {
|
||||||
return;
|
return;
|
||||||
|
@ -3433,7 +3435,7 @@ int32_t transAllocHandle(int64_t* refId) {
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
taosInitRWLatch(&exh->latch);
|
taosInitRWLatch(&exh->latch);
|
||||||
tDebug("alloc qid:%ld", exh->refId);
|
tDebug("trans alloc qid:%ld", exh->refId);
|
||||||
*refId = exh->refId;
|
*refId = exh->refId;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3459,7 +3461,7 @@ int32_t transFreeConnById(void* pInstRef, int64_t transpointId) {
|
||||||
}
|
}
|
||||||
pCli->type = Normal;
|
pCli->type = Normal;
|
||||||
|
|
||||||
tDebug("release conn id %" PRId64 "", transpointId);
|
tDebug("%s release conn id %" PRId64 "", pInst->label, transpointId);
|
||||||
|
|
||||||
STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId};
|
STransMsg msg = {.msgType = TDMT_SCH_TASK_RELEASE, .info.handle = (void*)transpointId};
|
||||||
msg.info.qId = transpointId;
|
msg.info.qId = transpointId;
|
||||||
|
@ -3513,14 +3515,6 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
code = transHeapGet(pHeap, &pConn);
|
code = transHeapGet(pHeap, &pConn);
|
||||||
// if (pConn && taosHashGetSifze(pConn->pQTable) > 0) {
|
|
||||||
// tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap,
|
|
||||||
// pConn->reqRefCnt); return NULL;
|
|
||||||
// } /*else {
|
|
||||||
// // tDebug("failed to get conn from heap cache for key:%s", key);
|
|
||||||
// // return NULL;
|
|
||||||
// }*/
|
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDebug("failed to get conn from heap cache for key:%s", key);
|
tDebug("failed to get conn from heap cache for key:%s", key);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -167,9 +167,6 @@ static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||||
static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg);
|
static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg);
|
||||||
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||||
// static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
|
||||||
|
|
||||||
static int32_t reallocConnRef(SSvrConn* conn);
|
|
||||||
|
|
||||||
int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; }
|
int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; }
|
||||||
|
|
||||||
|
@ -1309,43 +1306,26 @@ static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
|
|
||||||
// if (conn->regArg.init == 1) {
|
void uvConnDestroyAllState(SSvrConn* p) {
|
||||||
// transFreeMsg(conn->regArg.msg.pCont);
|
STrans* pInst = p->pInst;
|
||||||
// conn->regArg.init = 0;
|
SHashObj* pQTable = p->pQTable;
|
||||||
// }
|
if (pQTable == NULL) return;
|
||||||
// }
|
|
||||||
static int32_t reallocConnRef(SSvrConn* conn) {
|
void* pIter = taosHashIterate(pQTable, NULL);
|
||||||
if (conn->refId > 0) {
|
while (pIter) {
|
||||||
(void)transReleaseExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId);
|
SSvrRegArg* arg = pIter;
|
||||||
(void)transRemoveExHandle(uvGetConnRefOfThrd(conn->hostThrd), conn->refId);
|
int64_t* qid = taosHashGetKey(pIter, NULL);
|
||||||
}
|
(pInst->cfp)(pInst->parent, &(arg->msg), NULL);
|
||||||
// avoid app continue to send msg on invalid handle
|
tTrace("conn %p broken, notify server app, qid%ld", p, *qid);
|
||||||
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
pIter = taosHashIterate(pQTable, pIter);
|
||||||
if (exh == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exh->handle = conn;
|
taosHashCleanup(pQTable);
|
||||||
exh->pThrd = conn->hostThrd;
|
pQTable = NULL;
|
||||||
exh->refId = transAddExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh);
|
return;
|
||||||
if (exh->refId < 0) {
|
|
||||||
taosMemoryFree(exh);
|
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
|
||||||
SExHandle* pSelf = transAcquireExHandle(uvGetConnRefOfThrd(conn->hostThrd), exh->refId);
|
|
||||||
if (pSelf != exh) {
|
|
||||||
tError("conn %p failed to acquire handle", conn);
|
|
||||||
taosMemoryFree(exh);
|
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
conn->refId = exh->refId;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle) {
|
static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSvrConn* conn = handle->data;
|
SSvrConn* conn = handle->data;
|
||||||
|
|
||||||
|
@ -1360,18 +1340,15 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
STrans* pInst = thrd->pInst;
|
STrans* pInst = thrd->pInst;
|
||||||
tDebug("%s conn %p destroy", transLabel(pInst), conn);
|
tDebug("%s conn %p destroy", transLabel(pInst), conn);
|
||||||
|
|
||||||
// for (int i = 0; i < transQueueSize(&conn->resps); i++) {
|
|
||||||
// SSvrRespMsg* msg = transQueueGet(&conn->resps, i);
|
|
||||||
// destroySmsg(msg);
|
|
||||||
// }
|
|
||||||
transQueueDestroy(&conn->resps);
|
transQueueDestroy(&conn->resps);
|
||||||
transReqQueueClear(&conn->wreqQueue);
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
|
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
|
|
||||||
taosHashCleanup(conn->pQTable);
|
|
||||||
taosMemoryFree(conn->pTcp);
|
taosMemoryFree(conn->pTcp);
|
||||||
// destroyConnRegArg(conn);
|
|
||||||
|
uvConnDestroyAllState(conn);
|
||||||
|
|
||||||
(void)transDestroyBuffer(&conn->readBuf);
|
(void)transDestroyBuffer(&conn->readBuf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue