fix(stream): fix deadlock.
This commit is contained in:
parent
8bdd43c305
commit
5ae8b68b17
|
@ -817,7 +817,7 @@ _OVER:
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -834,7 +834,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||||
|
|
||||||
{ // check the max checkpoint id from all vnodes.
|
{ // check the max checkpoint id from all vnodes.
|
||||||
int64_t maxCheckpointId = -1;
|
int64_t maxCheckpointId = -1;
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
if (lock) {
|
||||||
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
|
|
||||||
|
@ -852,7 +855,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
if (lock) {
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
}
|
||||||
|
|
||||||
if (maxCheckpointId > maxChkptId) {
|
if (maxCheckpointId > maxChkptId) {
|
||||||
mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
|
mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
|
||||||
maxCheckpointId);
|
maxCheckpointId);
|
||||||
|
@ -872,7 +878,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||||
pMsg->checkpointId = mndStreamGenChkpId(pMnode);
|
pMsg->checkpointId = mndStreamGenChkptId(pMnode, true);
|
||||||
|
|
||||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||||
|
@ -2329,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||||
if (total == numOfTasks) { // all tasks has send the reqs
|
if (total == numOfTasks) { // all tasks has send the reqs
|
||||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
|
||||||
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
||||||
|
|
||||||
if (pStream != NULL) { // TODO:handle error
|
if (pStream != NULL) { // TODO:handle error
|
||||||
|
|
Loading…
Reference in New Issue