From 47afcbb38b9ae85638f843c57146f05b8d7415b6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 30 Jul 2024 10:08:06 +0800 Subject: [PATCH] refactor rocksdb backend --- source/libs/stream/src/streamBackendRocksdb.c | 222 ++++++++++-------- 1 file changed, 120 insertions(+), 102 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 09f6573052..4cab4387a9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -424,7 +424,7 @@ void cleanDir(const char* pPath, const char* id) { if (taosIsDir(pPath)) { taosRemoveDir(pPath); - taosMkDir(pPath); + (void)taosMkDir(pPath); stInfo("%s clear dir:%s, succ", id, pPath); } } @@ -531,7 +531,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId _EXIT: if (code != 0) { if (rename) { - taosRenameFile(defaultTmp, defaultPath); + (void)taosRenameFile(defaultTmp, defaultPath); } } @@ -652,13 +652,13 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); - taosCloseDir(&pDir); + (void)taosCloseDir(&pDir); return code; _ERROR: taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); - taosCloseDir(&pDir); + (void)taosCloseDir(&pDir); return code; } @@ -820,8 +820,8 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); - taosThreadMutexInit(&pHandle->mutex, NULL); - taosThreadMutexInit(&pHandle->cfMutex, NULL); + (void)taosThreadMutexInit(&pHandle->mutex, NULL); + (void)taosThreadMutexInit(&pHandle->cfMutex, NULL); pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); @@ -890,7 +890,7 @@ _EXIT: streamMutexDestroy(&pHandle->mutex); streamMutexDestroy(&pHandle->cfMutex); taosHashCleanup(pHandle->cfInst); - tdListFree(pHandle->list); + (void)tdListFree(pHandle->list); taosMemoryFree(pHandle); stDebug("failed to init stream backend at %s", backendPath); taosMemoryFree(backendPath); @@ -922,7 +922,7 @@ void streamBackendCleanup(void* arg) { head = tdListPopHead(pHandle->list); } - tdListFree(pHandle->list); + (void)tdListFree(pHandle->list); streamMutexDestroy(&pHandle->mutex); streamMutexDestroy(&pHandle->cfMutex); @@ -933,11 +933,11 @@ void streamBackendCleanup(void* arg) { void streamBackendHandleCleanup(void* arg) { SBackendCfWrapper* wrapper = arg; bool remove = wrapper->remove; - taosThreadRwlockWrlock(&wrapper->rwLock); + (void)taosThreadRwlockWrlock(&wrapper->rwLock); stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); if (wrapper->rocksdb == NULL) { - taosThreadRwlockUnlock(&wrapper->rwLock); + (void)taosThreadRwlockUnlock(&wrapper->rwLock); return; } @@ -988,9 +988,9 @@ void streamBackendHandleCleanup(void* arg) { wrapper->readOpts = NULL; taosMemoryFreeClear(wrapper->cfOpts); taosMemoryFreeClear(wrapper->param); - taosThreadRwlockUnlock(&wrapper->rwLock); + (void)taosThreadRwlockUnlock(&wrapper->rwLock); - taosThreadRwlockDestroy(&wrapper->rwLock); + (void)taosThreadRwlockDestroy(&wrapper->rwLock); wrapper->rocksdb = NULL; // taosReleaseRef(streamBackendId, wrapper->backendId); @@ -1083,12 +1083,20 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { STaskDbWrapper* pBackend = arg; - taosThreadRwlockWrlock(&pBackend->chkpDirLock); + (void)taosThreadRwlockWrlock(&pBackend->chkpDirLock); - taosArrayPush(pBackend->chkpSaved, &chkpId); + (void)taosArrayPush(pBackend->chkpSaved, &chkpId); SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); + if (chkpDel == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SArray* chkpDup = taosArrayInit(8, sizeof(int64_t)); + if (chkpDup == NULL) { + taosArrayDestroy(chkpDel); + return TSDB_CODE_OUT_OF_MEMORY; + } int64_t firsId = 0; if (taosArrayGetSize(pBackend->chkpInUse) >= 1) { @@ -1097,9 +1105,9 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); if (id >= firsId) { - taosArrayPush(chkpDup, &id); + (void)taosArrayPush(chkpDup, &id); } else { - taosArrayPush(chkpDel, &id); + (void)taosArrayPush(chkpDel, &id); } } } else { @@ -1108,11 +1116,11 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < dsz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - taosArrayPush(chkpDel, &id); + (void)taosArrayPush(chkpDel, &id); } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - taosArrayPush(chkpDup, &id); + (void)taosArrayPush(chkpDup, &id); } } taosArrayDestroy(pBackend->chkpSaved); @@ -1263,7 +1271,7 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); if (ret == 1) { - taosArrayPush(pBackend->chkpSaved, &checkpointId); + (void)taosArrayPush(pBackend->chkpSaved, &checkpointId); } } else { continue; @@ -1272,7 +1280,7 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { taosArraySort(pBackend->chkpSaved, chkpIdComp); taosMemoryFree(pChkpDir); - taosCloseDir(&pDir); + (void)taosCloseDir(&pDir); return 0; } @@ -1281,7 +1289,7 @@ int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_ha for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (pBackend->pCf[i]) { rocksdb_column_family_handle_t* p = pBackend->pCf[i]; - taosArrayPush(pHandle, &p); + (void)taosArrayPush(pHandle, &p); } } int32_t nCf = taosArrayGetSize(pHandle); @@ -1613,7 +1621,7 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { code = 0; _EXIT: - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); taosMemoryFree(pDst); return code; } @@ -1671,7 +1679,7 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) { goto _EXIT; } - atomic_store_64(&pTaskDb->dataWritten, 0); + (void)atomic_store_64(&pTaskDb->dataWritten, 0); pTaskDb->chkpId = chkpId; _EXIT: @@ -1679,13 +1687,13 @@ _EXIT: // clear checkpoint dir if failed if (code != 0 && pChkpDir != NULL) { if (taosDirExist(pChkpIdDir)) { - taosRemoveDir(pChkpIdDir); + (void)taosRemoveDir(pChkpIdDir); } } taosMemoryFree(pChkpIdDir); taosMemoryFree(pChkpDir); - taosReleaseRef(taskDbWrapperId, refId); + (void)taosReleaseRef(taskDbWrapperId, refId); taosMemoryFree(ppCf); return code; } @@ -1758,7 +1766,7 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, } int streamStateValueIsStale(char* v) { int64_t ts = 0; - taosDecodeFixedI64(v, &ts); + (void)taosDecodeFixedI64(v, &ts); return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; } int iterValueIsStale(rocksdb_iterator_t* iter) { @@ -1801,8 +1809,8 @@ int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, p1 = taosDecodeFixedI64(p1, &key1.key.ts); p2 = taosDecodeFixedI64(p2, &key2.key.ts); - taosDecodeFixedI64(p1, &key1.opNum); - taosDecodeFixedI64(p2, &key2.opNum); + (void)taosDecodeFixedI64(p1, &key1.opNum); + (void)taosDecodeFixedI64(p2, &key2.opNum); return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2)); } @@ -1998,8 +2006,8 @@ int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, s char* p1 = (char*)aBuf; char* p2 = (char*)bBuf; - taosDecodeFixedI64(p1, &w1); - taosDecodeFixedI64(p2, &w2); + (void)taosDecodeFixedI64(p1, &w1); + (void)taosDecodeFixedI64(p2, &w2); if (w1 == w2) { return 0; } else { @@ -2320,7 +2328,7 @@ void taskDbRemoveRef(void* pTaskDb) { } STaskDbWrapper* pBackend = pTaskDb; - taosReleaseRef(taskDbWrapperId, pBackend->refId); + (void)taosReleaseRef(taskDbWrapperId, pBackend->refId); } void taskDbInitOpt(STaskDbWrapper* pTaskDb) { @@ -2386,22 +2394,22 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { pTaskDb->chkpId = -1; pTaskDb->chkpCap = 4; pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); - taskDbLoadChkpInfo(pTaskDb); + (void)taskDbLoadChkpInfo(pTaskDb); pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); - taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); + (void)taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); } void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { - taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); - taosArrayPush(pTaskDb->chkpInUse, &chkp); + (void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + (void)taosArrayPush(pTaskDb->chkpInUse, &chkp); taosArraySort(pTaskDb->chkpInUse, chkpIdComp); - taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); + (void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); } void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { - taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + (void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); int32_t size = taosArrayGetSize(pTaskDb->chkpInUse); for (int i = 0; i < size; i++) { int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); @@ -2410,13 +2418,13 @@ void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { break; } } - taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); + (void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); } void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { taosArrayDestroy(pTaskDb->chkpSaved); taosArrayDestroy(pTaskDb->chkpInUse); - taosThreadRwlockDestroy(&pTaskDb->chkpDirLock); + (void)taosThreadRwlockDestroy(&pTaskDb->chkpDirLock); } int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { @@ -2462,9 +2470,9 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { STaskDbWrapper* p = pTaskDb; - streamMutexLock(&p->mutex); + (void)streamMutexLock(&p->mutex); p->chkpId = chkpId; - streamMutexUnlock(&p->mutex); + (void)streamMutexUnlock(&p->mutex); } STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { @@ -2476,7 +2484,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { pTaskDb->idstr = key ? taosStrdup(key) : NULL; pTaskDb->path = statePath ? taosStrdup(statePath) : NULL; - taosThreadMutexInit(&pTaskDb->mutex, NULL); + (void)taosThreadMutexInit(&pTaskDb->mutex, NULL); taskDbInitChkpOpt(pTaskDb); taskDbInitOpt(pTaskDb); @@ -2650,7 +2658,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { - taosReleaseRef(taskDbWrapperId, refId); + (void)taosReleaseRef(taskDbWrapperId, refId); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2658,7 +2666,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(buf); - taosReleaseRef(taskDbWrapperId, refId); + (void)taosReleaseRef(taskDbWrapperId, refId); return TSDB_CODE_OUT_OF_RANGE; } @@ -2669,7 +2677,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char taosMemoryFree(buf); } - taosReleaseRef(taskDbWrapperId, refId); + (void)taosReleaseRef(taskDbWrapperId, refId); return code; } @@ -2906,7 +2914,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->dbOpt = handle->dbOpt; rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); - taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); + (void)taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; } @@ -3146,9 +3154,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - atomic_add_fetch_64(&wrapper->dataWritten, 1); \ + (void)atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ - if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ @@ -3180,7 +3188,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ char toString[128] = {0}; \ - if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->db; \ @@ -3223,9 +3231,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - atomic_add_fetch_64(&wrapper->dataWritten, 1); \ + (void)atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ - if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->db; \ @@ -3264,7 +3272,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { stDebug("streamStateClear_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - atomic_add_fetch_64(&wrapper->dataWritten, 1); + (void)atomic_add_fetch_64(&wrapper->dataWritten, 1); char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -3280,8 +3288,8 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { if (err != NULL) { char toStringStart[128] = {0}; char toStringEnd[128] = {0}; - stateKeyToString(&sKey, toStringStart); - stateKeyToString(&eKey, toStringEnd); + (void)stateKeyToString(&sKey, toStringStart); + (void)stateKeyToString(&eKey, toStringEnd); stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); @@ -3298,15 +3306,21 @@ void streamStateCurNext_rocksdb(SStreamStateCur* pCur) { } } int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { + int code = 0; stDebug("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; - streamStatePut_rocksdb(pState, &tmp, NULL, 0); + code = streamStatePut_rocksdb(pState, &tmp, NULL, 0); + if (code != 0) { + return code; + } SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + if (code != 0) { + return code; + } streamStateFreeCur(pCur); - streamStateDel_rocksdb(pState, &tmp); - return code; + return streamStateDel_rocksdb(pState, &tmp); } int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { @@ -3335,6 +3349,9 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke return 0; } *pVal = taosMemoryMalloc(size); + if (*pVal == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memset(*pVal, 0, size); return 0; } @@ -3351,7 +3368,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { size_t tlen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - stateKeyDecode((void*)pKtmp, keyStr); + (void)stateKeyDecode((void*)pKtmp, keyStr); if (pKtmp->opNum != pCur->number) { return -1; } @@ -3404,7 +3421,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStateKey curKey; size_t kLen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); + (void)stateKeyDecode((void*)&curKey, keyStr); if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { return pCur; } @@ -3426,7 +3443,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { { char tbuf[256] = {0}; - stateKeyToString((void*)&maxStateKey, tbuf); + (void)stateKeyToString((void*)&maxStateKey, tbuf); stDebug("seek to last:%s", tbuf); } @@ -3476,7 +3493,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* SStateKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); + (void)stateKeyDecode((void*)&curKey, keyStr); if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { pCur->number = pState->number; @@ -3624,7 +3641,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); + (void)stateSessionKeyDecode(&curKey, (char*)iKey); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; rocksdb_iter_prev(pCur->iter); @@ -3661,7 +3678,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); + (void)stateSessionKeyDecode(&curKey, (char*)iKey); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; rocksdb_iter_next(pCur->iter); @@ -3701,7 +3718,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); + (void)stateSessionKeyDecode(&curKey, (char*)iKey); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; rocksdb_iter_next(pCur->iter); @@ -3741,7 +3758,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); + (void)stateSessionKeyDecode(&curKey, (char*)iKey); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur; rocksdb_iter_prev(pCur->iter); @@ -3764,7 +3781,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); - stateSessionKeyDecode((void*)&ktmp, (char*)curKey); + (void)stateSessionKeyDecode((void*)&ktmp, (char*)curKey); if (pVal != NULL) *pVal = NULL; if (pVLen != NULL) *pVLen = 0; @@ -3843,7 +3860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK size_t kLen; SWinKey curKey; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); + (void)winKeyDecode((void*)&curKey, keyStr); if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { return pCur; } @@ -3863,7 +3880,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, } size_t klen, vlen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen); - winKeyDecode(&winKey, keyStr); + (void)winKeyDecode(&winKey, keyStr); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal); @@ -3904,7 +3921,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); + (void)winKeyDecode((void*)&curKey, keyStr); if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) { return pCur; } @@ -3941,7 +3958,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); + (void)winKeyDecode((void*)&curKey, keyStr); if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { return pCur; } @@ -4024,11 +4041,12 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe int32_t valSize = *pVLen; void* tmp = taosMemoryMalloc(valSize); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - if (pCur == NULL) { - } - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { @@ -4214,7 +4232,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { int64_t checkPoint = 0; if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - taosArrayPush(result, &checkPoint); + (void)taosArrayPush(result, &checkPoint); } } else { break; @@ -4306,7 +4324,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb { char tbuf[256] = {0}; - ginitDict[i].toStrFunc((void*)key, tbuf); + (void)(ginitDict[i].toStrFunc((void*)key, tbuf)); stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); } return 0; @@ -4321,7 +4339,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - atomic_add_fetch_64(&wrapper->dataWritten, 1); + (void)atomic_add_fetch_64(&wrapper->dataWritten, 1); rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -4429,8 +4447,8 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { if (fname == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - strncpy(fname, name, len); - taosArrayPush(diff, &fname); + (void)strncpy(fname, name, len); + (void)taosArrayPush(diff, &fname); } pIter = taosHashIterate(p2, pIter); } @@ -4506,7 +4524,7 @@ void dbChkpDebugInfo(SDbChkp* pDb) { int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { int32_t code = 0; int32_t nBytes; - taosThreadRwlockWrlock(&p->rwLock); + (void)taosThreadRwlockWrlock(&p->rwLock); p->preCkptId = p->curChkpId; p->curChkpId = chkpId; @@ -4524,7 +4542,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { nBytes = snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (nBytes <= 0 || nBytes >= p->len) { - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return TSDB_CODE_OUT_OF_RANGE; } @@ -4534,7 +4552,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { TdDirPtr pDir = taosOpenDir(p->buf); if (pDir == NULL) { - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return TAOS_SYSTEM_ERROR(errno); } @@ -4570,9 +4588,9 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { continue; } } - taosCloseDir(&pDir); + (void)taosCloseDir(&pDir); if (code != 0) { - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return code; } @@ -4584,12 +4602,12 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { if (name != NULL && !isBkdDataMeta(name, len)) { char* fname = taosMemoryCalloc(1, len + 1); if (fname == NULL) { - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return TSDB_CODE_OUT_OF_MEMORY; } - strncpy(fname, name, len); - taosArrayPush(p->pAdd, &fname); + (void)strncpy(fname, name, len); + (void)taosArrayPush(p->pAdd, &fname); } pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } @@ -4621,7 +4639,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { p->idx = 1 - p->idx; - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return code; } @@ -4679,7 +4697,7 @@ int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) { } p->update = 0; - taosThreadRwlockInit(&p->rwLock, NULL); + (void)taosThreadRwlockInit(&p->rwLock, NULL); SArray* list = NULL; code = dbChkpGetDelta(p, initChkpId, list); @@ -4793,7 +4811,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } - taosArrayPush(list, &p); + (void)taosArrayPush(list, &p); } // copy current file to dst dir @@ -4859,7 +4877,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { if (nBytes <= 0 || nBytes >= sizeof(content)) { code = TSDB_CODE_OUT_OF_RANGE; stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); goto _ERROR; } @@ -4867,10 +4885,10 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { if (nBytes != strlen(content)) { code = TAOS_SYSTEM_ERROR(errno); stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code)); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); goto _ERROR; } - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); // clear delta data buf taosArrayClearP(p->pAdd, taosMemoryFree); @@ -4879,7 +4897,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { _ERROR: taosMemoryFree(buffer); - taosThreadRwlockUnlock(&p->rwLock); + (void)taosThreadRwlockUnlock(&p->rwLock); return code; } @@ -4925,7 +4943,7 @@ void bkdMgtDestroy(SBkdMgt* bm) { pIter = taosHashIterate(bm->pDbChkpTbl, pIter); } - taosThreadRwlockDestroy(&bm->rwLock); + (void)taosThreadRwlockDestroy(&bm->rwLock); taosMemoryFree(bm->path); taosHashCleanup(bm->pDbChkpTbl); @@ -4933,7 +4951,7 @@ void bkdMgtDestroy(SBkdMgt* bm) { } int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) { int32_t code = 0; - taosThreadRwlockWrlock(&bm->rwLock); + (void)taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; @@ -4941,14 +4959,14 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t cap = strlen(bm->path) + 64; char* path = taosMemoryCalloc(1, cap); if (path == NULL) { - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); return TSDB_CODE_OUT_OF_MEMORY; } int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId); if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(path); - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); code = TSDB_CODE_OUT_OF_RANGE; return code; } @@ -4957,20 +4975,20 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, code = dbChkpCreate(path, chkpId, &p); if (code != 0) { taosMemoryFree(path); - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); return code; } if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) { dbChkpDestroy(p); - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); code = terrno; return code; } pChkp = p; code = dbChkpDumpTo(pChkp, dname, list); - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); return code; } else { code = dbChkpGetDelta(pChkp, chkpId, NULL); @@ -4979,7 +4997,7 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, } } - taosThreadRwlockUnlock(&bm->rwLock); + (void)taosThreadRwlockUnlock(&bm->rwLock); return code; }