diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index edf6e7ce96..f56a3c4852 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1199,7 +1199,6 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints"); if (nBytes >= cap) { - taosMemoryFree(pChkpDir); return TSDB_CODE_OUT_OF_RANGE; } if (!taosIsDir(pChkpDir)) { @@ -4294,9 +4293,13 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr } } +<<<<<<< HEAD int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) { stDebug("streamStateFillGetKVByCur_rocksdb"); +======= +int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) { +>>>>>>> origin/main if (!pCur) { return -1; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 23a98ef3ae..8e8e300ee6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -99,7 +99,7 @@ int32_t metaRefMgtInit() { void metaRefMgtCleanup() { void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL); while (pIter) { - int64_t* p = *(int64_t**) pIter; + int64_t* p = *(int64_t**)pIter; taosMemoryFree(p); pIter = taosHashIterate(gMetaRefMgt.pTable, pIter); } @@ -118,14 +118,14 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { if (p == NULL) { code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*)); if (code) { - stError("vgId:%d failed to put into refId mgt, refId:%" PRId64" %p, code:%s", (int32_t)vgId, *rid, rid, + stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid, tstrerror(code)); return code; - } else { // not -// stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid); + } else { // not + // stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid); } } else { - stFatal("try to add refId:%"PRId64" vgId:%d, %p that already added into mgt", *rid, (int32_t) vgId, rid); + stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid); } streamMutexUnlock(&gMetaRefMgt.mutex); @@ -292,6 +292,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) void* p = taskDbAddRef(*ppBackend); if (p == NULL) { stError("s-task:0x%x failed to ref backend", pTask->id.taskId); + streamMutexUnlock(&pMeta->backendMutex); return TSDB_CODE_FAILED; } @@ -669,7 +670,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { int64_t refId = pTask->id.refId; int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId); if (ret != 0) { - stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id[1], refId); + stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId); } } else { stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId); @@ -727,14 +728,14 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret != 0) { - stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId); + stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId); } return code; } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); - void* pUnused = taosArrayPop(pMeta->pTaskList); + void* pUnused = taosArrayPop(pMeta->pTaskList); int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { @@ -745,7 +746,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa if ((code = streamMetaCommit(pMeta)) != 0) { int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); - void* pUnused = taosArrayPop(pMeta->pTaskList); + void* pUnused = taosArrayPop(pMeta->pTaskList); int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { @@ -783,7 +784,7 @@ int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_ SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId); if (p == NULL) { - stDebug("s-task:%x failed to acquire task refId:%"PRId64", may have been destoried", taskId, *pTaskRefId); + stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } @@ -946,11 +947,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask->info.delaySchedParam = 0; } - int64_t refId = pTask->id.refId; int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret != 0) { - stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId); + stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId); } streamMetaReleaseTask(pMeta, pTask); @@ -1115,7 +1115,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = streamTaskGetTaskId(pTask); tFreeStreamTask(pTask); - void* px = taosArrayPush(pRecycleList, &id); + void* px = taosArrayPush(pRecycleList, &id); if (px == NULL) { stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId); } @@ -1165,7 +1165,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId); + stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId); if (pTask->info.fillHistory == 0) { int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); }