fix backen crash

This commit is contained in:
Yihao Deng 2024-05-23 12:02:01 +00:00
parent cb6e03d985
commit a021b62c04
2 changed files with 13 additions and 8 deletions

View File

@ -1155,10 +1155,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
taskDbAddRef(pTaskDb); taskDbAddRef(pTaskDb);
int64_t chkpId = pTaskDb->chkpId; int64_t chkpId = pTaskDb->chkpId;
taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId); code = taskDbDoCheckpoint(pTaskDb, chkpId);
taskDbRemoveRef(pTaskDb); if (code != 0) {
taskDbUnRefChkp(pTaskDb, chkpId);
}
taskDbRefChkp(pTaskDb, pTaskDb->chkpId); taskDbRemoveRef(pTaskDb);
SStreamTask* pTask = pTaskDb->pTask; SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId, SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
@ -1182,14 +1185,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) { for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i); SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId); sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL) { if (pTaskDb == NULL || *pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId); stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
memset(buf, 0, sizeof(buf));
continue; continue;
} }
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
taskDbUnRefChkp(pTaskDb, pSnap->chkpId); taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
} }
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
return 0; return 0;
@ -1989,7 +1993,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) { int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
for (int i = 0; i < size; i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) { if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i); taosArrayRemove(pTaskDb->chkpInUse, i);

View File

@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {
@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
} }
taosArrayDestroy(handle->pDbSnapSet); taosArrayDestroy(handle->pDbSnapSet);
} }
streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) { if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) { for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i); SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);