fix deadlock
This commit is contained in:
parent
a5e3f54ca2
commit
91d0fe5c8b
|
@ -1199,7 +1199,6 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||||
|
|
||||||
nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||||
if (nBytes >= cap) {
|
if (nBytes >= cap) {
|
||||||
taosMemoryFree(pChkpDir);
|
|
||||||
return TSDB_CODE_OUT_OF_RANGE;
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
if (!taosIsDir(pChkpDir)) {
|
if (!taosIsDir(pChkpDir)) {
|
||||||
|
@ -4294,9 +4293,13 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
|
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
|
||||||
int32_t* pVLen) {
|
int32_t* pVLen) {
|
||||||
stDebug("streamStateFillGetKVByCur_rocksdb");
|
stDebug("streamStateFillGetKVByCur_rocksdb");
|
||||||
|
=======
|
||||||
|
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
|
||||||
|
>>>>>>> origin/main
|
||||||
if (!pCur) {
|
if (!pCur) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ int32_t metaRefMgtInit() {
|
||||||
void metaRefMgtCleanup() {
|
void metaRefMgtCleanup() {
|
||||||
void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
|
void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
int64_t* p = *(int64_t**) pIter;
|
int64_t* p = *(int64_t**)pIter;
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
|
pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
|
||||||
}
|
}
|
||||||
|
@ -118,14 +118,14 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
|
code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("vgId:%d failed to put into refId mgt, refId:%" PRId64" %p, code:%s", (int32_t)vgId, *rid, rid,
|
stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
} else { // not
|
} else { // not
|
||||||
// stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
|
// stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stFatal("try to add refId:%"PRId64" vgId:%d, %p that already added into mgt", *rid, (int32_t) vgId, rid);
|
stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&gMetaRefMgt.mutex);
|
streamMutexUnlock(&gMetaRefMgt.mutex);
|
||||||
|
@ -292,6 +292,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
||||||
void* p = taskDbAddRef(*ppBackend);
|
void* p = taskDbAddRef(*ppBackend);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
|
stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
|
||||||
|
streamMutexUnlock(&pMeta->backendMutex);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -669,7 +670,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
int64_t refId = pTask->id.refId;
|
int64_t refId = pTask->id.refId;
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id[1], refId);
|
stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
|
stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
|
||||||
|
@ -727,14 +728,14 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
|
stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
@ -745,7 +746,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
|
|
||||||
if ((code = streamMetaCommit(pMeta)) != 0) {
|
if ((code = streamMetaCommit(pMeta)) != 0) {
|
||||||
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
@ -783,7 +784,7 @@ int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_
|
||||||
|
|
||||||
SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
|
SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
stDebug("s-task:%x failed to acquire task refId:%"PRId64", may have been destoried", taskId, *pTaskRefId);
|
stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -946,11 +947,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
pTask->info.delaySchedParam = 0;
|
pTask->info.delaySchedParam = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int64_t refId = pTask->id.refId;
|
int64_t refId = pTask->id.refId;
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
|
stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1115,7 +1115,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
STaskId id = streamTaskGetTaskId(pTask);
|
STaskId id = streamTaskGetTaskId(pTask);
|
||||||
|
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
void* px = taosArrayPush(pRecycleList, &id);
|
void* px = taosArrayPush(pRecycleList, &id);
|
||||||
if (px == NULL) {
|
if (px == NULL) {
|
||||||
stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
|
stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
|
||||||
}
|
}
|
||||||
|
@ -1165,7 +1165,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId);
|
stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue