fix: fix rpc quit problem
This commit is contained in:
parent
697ffc524f
commit
1fc9eaadd0
|
@ -40,7 +40,7 @@ volatile int32_t tscInitRes = 0;
|
||||||
static int32_t registerRequest(SRequestObj *pRequest) {
|
static int32_t registerRequest(SRequestObj *pRequest) {
|
||||||
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
|
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
|
||||||
if (NULL == pTscObj) {
|
if (NULL == pTscObj) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
|
||||||
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
|
if (/*msgType == TDMT_VND_QUERY ||*/ msgType == TDMT_VND_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -133,11 +133,11 @@ void closeAllRequests(SHashObj *pRequests) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyAppInst(SAppInstInfo* pAppInfo) {
|
void destroyAppInst(SAppInstInfo *pAppInfo) {
|
||||||
tscDebug("destroy app inst mgr %p", pAppInfo);
|
tscDebug("destroy app inst mgr %p", pAppInfo);
|
||||||
|
|
||||||
taosThreadMutexLock(&appInfo.mutex);
|
taosThreadMutexLock(&appInfo.mutex);
|
||||||
|
|
||||||
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
|
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
|
||||||
taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
|
taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
|
||||||
|
|
||||||
|
@ -145,10 +145,10 @@ void destroyAppInst(SAppInstInfo* pAppInfo) {
|
||||||
|
|
||||||
taosMemoryFreeClear(pAppInfo->instKey);
|
taosMemoryFreeClear(pAppInfo->instKey);
|
||||||
closeTransporter(pAppInfo);
|
closeTransporter(pAppInfo);
|
||||||
|
|
||||||
taosThreadMutexLock(&pAppInfo->qnodeMutex);
|
taosThreadMutexLock(&pAppInfo->qnodeMutex);
|
||||||
taosArrayDestroy(pAppInfo->pQnodeList);
|
taosArrayDestroy(pAppInfo->pQnodeList);
|
||||||
taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
|
taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
|
||||||
|
|
||||||
taosMemoryFree(pAppInfo);
|
taosMemoryFree(pAppInfo);
|
||||||
}
|
}
|
||||||
|
@ -161,7 +161,7 @@ void destroyTscObj(void *pObj) {
|
||||||
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||||
closeAllRequests(pTscObj->pRequests);
|
closeAllRequests(pTscObj->pRequests);
|
||||||
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
||||||
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
|
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
|
||||||
pTscObj->pAppInfo->numOfConns);
|
pTscObj->pAppInfo->numOfConns);
|
||||||
|
|
||||||
if (0 == connNum) {
|
if (0 == connNum) {
|
||||||
|
|
|
@ -464,8 +464,7 @@ void* destroyConnPool(void* pool) {
|
||||||
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
|
||||||
while (connList != NULL) {
|
while (connList != NULL) {
|
||||||
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
while (!QUEUE_IS_EMPTY(&connList->conn)) {
|
||||||
queue* h = QUEUE_HEAD(&connList->conn);
|
queue* h = QUEUE_HEAD(&connList->conn);
|
||||||
// QUEUE_REMOVE(h);
|
|
||||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||||
cliDestroyConn(c, true);
|
cliDestroyConn(c, true);
|
||||||
}
|
}
|
||||||
|
@ -613,7 +612,10 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void cliDestroy(uv_handle_t* handle) {
|
static void cliDestroy(uv_handle_t* handle) {
|
||||||
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) return;
|
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
transRemoveExHandle(conn->refId);
|
transRemoveExHandle(conn->refId);
|
||||||
taosMemoryFree(conn->ip);
|
taosMemoryFree(conn->ip);
|
||||||
|
|
Loading…
Reference in New Issue