fix(stream): fix dead lock.
This commit is contained in:
parent
2dade996bb
commit
8c998673ad
|
@ -794,7 +794,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
|
|||
int64_t id = *(int64_t*)pIter;
|
||||
|
||||
SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
|
||||
if (wrapper == NULL) continue;
|
||||
if (wrapper == NULL) {
|
||||
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosThreadRwlockRdlock(&wrapper->rwLock);
|
||||
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
||||
|
@ -967,6 +970,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
|||
|
||||
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||
if (pHandle == NULL || pHandle->db == NULL) {
|
||||
stError("failed to acquire state-backend handle");
|
||||
goto _ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -321,7 +321,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||
|
||||
if (remain == 0) { // all tasks are ready
|
||||
stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
||||
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
|
||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
||||
stInfo(
|
||||
|
|
Loading…
Reference in New Issue