fix(stream): fix dead lock caused by refactor.
This commit is contained in:
parent
aefb9d275e
commit
6539760c64
|
@ -402,6 +402,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,6 +446,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||||
if (pReadyInfo == NULL) {
|
if (pReadyInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,6 +461,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||||
if (pReadyInfo == NULL) {
|
if (pReadyInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,7 +846,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||||
if (pNotSendList == NULL) {
|
if (pNotSendList == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
stError("s-task:%s quit tmr function due to out of memory", id);
|
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
||||||
streamMutexUnlock(&pActiveInfo->lock);
|
streamMutexUnlock(&pActiveInfo->lock);
|
||||||
|
|
||||||
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
||||||
|
@ -956,13 +959,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
|
||||||
|
|
||||||
streamMutexLock(&pInfo->lock);
|
streamMutexLock(&pInfo->lock);
|
||||||
if (!pInfo->dispatchTrigger) {
|
if (!pInfo->dispatchTrigger) {
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (pSendInfo == NULL) {
|
if (pSendInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -982,11 +986,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
|
||||||
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
|
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(0);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1043,6 +1047,7 @@ int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -919,8 +919,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId,
|
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId,
|
||||||
&msg);
|
pInfo->checkpointId, &msg);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
|
code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue