enh(tsc): handle deadlock
This commit is contained in:
parent
6e561a6c78
commit
ab3f6619b9
|
@ -173,7 +173,8 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
|
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
|
||||||
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
|
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
|
||||||
pTscObj->connId = pRsp->query->connId;
|
pTscObj->connId = pRsp->query->connId;
|
||||||
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes);
|
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
|
||||||
|
pTscObj->pAppInfo->totalDnodes);
|
||||||
|
|
||||||
if (pRsp->query->killRid) {
|
if (pRsp->query->killRid) {
|
||||||
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
|
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
|
||||||
|
@ -297,7 +298,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
|
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
|
||||||
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
|
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes,
|
||||||
|
(*pInst)->totalDnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rspNum) {
|
if (rspNum) {
|
||||||
|
@ -654,6 +656,8 @@ int32_t hbGatherAppInfo(void) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||||
|
if (pAppHbMgr == NULL) continue;
|
||||||
|
|
||||||
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
|
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
|
||||||
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
|
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
|
||||||
if (NULL == pApp) {
|
if (NULL == pApp) {
|
||||||
|
@ -691,15 +695,20 @@ static void *hbThreadFunc(void *param) {
|
||||||
hbGatherAppInfo();
|
hbGatherAppInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray *mgr = taosArrayInit(sz, sizeof(void *));
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||||
|
if (pAppHbMgr == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
||||||
if (connCnt == 0) {
|
if (connCnt == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
|
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
|
||||||
|
tFreeClientHbBatchReq(pReq);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
|
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
|
||||||
|
@ -726,7 +735,7 @@ static void *hbThreadFunc(void *param) {
|
||||||
pInfo->msgInfo.len = tlen;
|
pInfo->msgInfo.len = tlen;
|
||||||
pInfo->msgType = TDMT_MND_HEARTBEAT;
|
pInfo->msgType = TDMT_MND_HEARTBEAT;
|
||||||
pInfo->param = strdup(pAppHbMgr->key);
|
pInfo->param = strdup(pAppHbMgr->key);
|
||||||
pInfo->paramFreeFp = taosMemoryFree;
|
pInfo->paramFreeFp = taosMemoryFree;
|
||||||
pInfo->requestId = generateRequestId();
|
pInfo->requestId = generateRequestId();
|
||||||
pInfo->requestObjRefId = 0;
|
pInfo->requestObjRefId = 0;
|
||||||
|
|
||||||
|
@ -738,8 +747,12 @@ static void *hbThreadFunc(void *param) {
|
||||||
// hbClearReqInfo(pAppHbMgr);
|
// hbClearReqInfo(pAppHbMgr);
|
||||||
|
|
||||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||||
|
taosArrayPush(mgr, &pAppHbMgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||||
|
clientHbMgr.appHbMgrs = mgr;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&clientHbMgr.lock);
|
taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||||
|
|
||||||
taosMsleep(HEARTBEAT_INTERVAL);
|
taosMsleep(HEARTBEAT_INTERVAL);
|
||||||
|
@ -831,7 +844,7 @@ void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
|
||||||
if (pItem == *pAppHbMgr) {
|
if (pItem == *pAppHbMgr) {
|
||||||
hbFreeAppHbMgr(*pAppHbMgr);
|
hbFreeAppHbMgr(*pAppHbMgr);
|
||||||
*pAppHbMgr = NULL;
|
*pAppHbMgr = NULL;
|
||||||
taosArrayRemove(clientHbMgr.appHbMgrs, i);
|
taosArraySet(clientHbMgr.appHbMgrs, i, NULL);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -856,7 +869,15 @@ int hbMgrInit() {
|
||||||
|
|
||||||
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
|
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
|
||||||
taosThreadMutexInit(&clientHbMgr.lock, NULL);
|
|
||||||
|
TdThreadMutexAttr attr = {0};
|
||||||
|
taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||||||
|
|
||||||
|
int ret = taosThreadMutexAttrInit(&attr);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
taosThreadMutexInit(&clientHbMgr.lock, &attr);
|
||||||
|
taosThreadMutexAttrDestroy(&attr);
|
||||||
|
|
||||||
// init handle funcs
|
// init handle funcs
|
||||||
hbMgrInitHandle();
|
hbMgrInitHandle();
|
||||||
|
|
Loading…
Reference in New Issue