Merge pull request #25915 from taosdata/fix/TD-30220

Fix/TD-30220
This commit is contained in:
Hongze Cheng 2024-05-24 08:59:01 +08:00 committed by GitHub
commit a193b589a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 13 additions and 8 deletions

View File

@ -1155,10 +1155,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
taskDbAddRef(pTaskDb);
int64_t chkpId = pTaskDb->chkpId;
taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId);
taskDbRemoveRef(pTaskDb);
if (code != 0) {
taskDbUnRefChkp(pTaskDb, chkpId);
}
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb);
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
@ -1182,14 +1185,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL) {
STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL || *pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
memset(buf, 0, sizeof(buf));
continue;
}
memset(buf, 0, sizeof(buf));
taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return 0;
@ -1989,7 +1993,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
for (int i = 0; i < size; i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i);

View File

@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pDbSnapSet);
}
streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);