diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b8d5c35dfb..41392ba27b 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -105,6 +105,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { pHdr->size = len; memcpy(pHdr->data, rowData, len); tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode)); + taosMemoryFree(rowData); return code; _err: diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a419a59996..1e6347c0ce 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -459,7 +459,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { } else { stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp, - tstrerror(TAOS_SYSTEM_ERROR(errno)), state); + tstrerror(TAOS_SYSTEM_ERROR(errno)), state); taosMkDir(state); } taosMemoryFree(chkp); @@ -813,6 +813,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* } int32_t nCf = taosArrayGetSize(pHandle); + if (nCf == 0) { + taosArrayDestroy(pHandle); + return nCf; + } rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < nCf; i++) { @@ -845,6 +849,9 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { + if (nCf == 0) { + return 0; + } int code = 0; char* err = NULL; @@ -910,7 +917,7 @@ int32_t streamBackendTriggerChkp(void* arg, char* dst) { stError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, - taosGetTimestampMs() - st); + taosGetTimestampMs() - st); } } else { stError("stream backend:%p failed to flush db at:%s", pHandle, dst); @@ -979,15 +986,15 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); code = chkpPreFlushDb(pHandle->db, ppCf, nCf); - if (code == 0) { + if (code == 0 && nCf != 0) { code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); if (code != 0) { stError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, - taosGetTimestampMs() - st); + taosGetTimestampMs() - st); } - } else { + } else if (nCf != 0) { stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } // release all ref to cfWrapper; @@ -1711,7 +1718,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { char* status[] = {"close", "drop"}; stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, - wrapper->idstr); + wrapper->idstr); wrapper->remove |= remove; // update by other pState taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); } @@ -1783,35 +1790,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - stWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + stWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, \ + ttlVLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -1821,7 +1829,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - stWarn("streamState failed to get cf name: %s", funcname); \ + stWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ break; \ } \ @@ -1836,9 +1844,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ if (val == NULL || len == 0) { \ if (err == NULL) { \ - stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ } else { \ - stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ taosMemoryFreeClear(err); \ } \ code = -1; \ @@ -1846,11 +1854,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* p = NULL; \ int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ if (tlen <= 0) { \ - stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ + stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ code = -1; \ } else { \ - stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ + stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ } \ taosMemoryFree(val); \ if (vLen != NULL) *vLen = tlen; \ @@ -1864,7 +1872,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ break; \ } \ @@ -1877,11 +1885,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ if (err != NULL) { \ - stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ taosMemoryFree(err); \ code = -1; \ } else { \ - stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ } \ } while (0); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 45c2743ecb..020f2de326 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -149,7 +149,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pReadyTaskSet == NULL) { - } pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); @@ -208,7 +207,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, - stage); + stage); return pMeta; _err: @@ -248,7 +247,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath, - tstrerror(terrno)); + tstrerror(terrno)); taosMemoryFree(defaultPath); taosMemoryFree(newPath); @@ -268,6 +267,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta); + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); return 0; } @@ -379,10 +380,10 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId, - tstrerror(terrno)); + stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId, + tstrerror(terrno)); } else { - stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId); + stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId); } return code; @@ -393,7 +394,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -434,7 +435,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { int32_t num = 0; size_t size = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < size; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); if (p == NULL) { continue; @@ -451,7 +452,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - STaskId id = {.streamId = streamId, .taskId = taskId}; + STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { @@ -495,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); - STaskId id = {.streamId = streamId, .taskId = taskId}; + STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; @@ -512,7 +513,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWUnLockLatch(&pMeta->lock); stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId, - streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + streamGetTaskStatusStr(TASK_STATUS__DROPPING)); while (1) { taosRLockLatch(&pMeta->lock); @@ -650,7 +651,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { - TBC* pCur = NULL; + TBC* pCur = NULL; int32_t vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); @@ -683,8 +684,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); stError( - "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " - "manually", vgId, tsDataDir); + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " + "stream " + "manually", + vgId, tsDataDir); return -1; } tDecoderClear(&decoder); @@ -756,7 +759,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, - pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); + pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); return 0; } @@ -803,7 +806,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { - if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter + if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; return true; } @@ -915,7 +918,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbCount += 1; stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, - pMeta->pHbInfo->hbCount); + pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); @@ -952,7 +955,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, - (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); taosWLockLatch(&pMeta->lock);