Merge pull request #26862 from taosdata/fix/refactorRocksdbBackend

refactor rocksdb backend
This commit is contained in:
Hongze Cheng 2024-07-30 11:40:54 +08:00 committed by GitHub
commit 73e0fb626e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 120 additions and 102 deletions

View File

@ -424,7 +424,7 @@ void cleanDir(const char* pPath, const char* id) {
if (taosIsDir(pPath)) {
taosRemoveDir(pPath);
taosMkDir(pPath);
(void)taosMkDir(pPath);
stInfo("%s clear dir:%s, succ", id, pPath);
}
}
@ -531,7 +531,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
_EXIT:
if (code != 0) {
if (rename) {
taosRenameFile(defaultTmp, defaultPath);
(void)taosRenameFile(defaultTmp, defaultPath);
}
}
@ -652,13 +652,13 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
(void)taosCloseDir(&pDir);
return code;
_ERROR:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
(void)taosCloseDir(&pDir);
return code;
}
@ -820,8 +820,8 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL);
(void)taosThreadMutexInit(&pHandle->mutex, NULL);
(void)taosThreadMutexInit(&pHandle->cfMutex, NULL);
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
@ -890,7 +890,7 @@ _EXIT:
streamMutexDestroy(&pHandle->mutex);
streamMutexDestroy(&pHandle->cfMutex);
taosHashCleanup(pHandle->cfInst);
tdListFree(pHandle->list);
(void)tdListFree(pHandle->list);
taosMemoryFree(pHandle);
stDebug("failed to init stream backend at %s", backendPath);
taosMemoryFree(backendPath);
@ -922,7 +922,7 @@ void streamBackendCleanup(void* arg) {
head = tdListPopHead(pHandle->list);
}
tdListFree(pHandle->list);
(void)tdListFree(pHandle->list);
streamMutexDestroy(&pHandle->mutex);
streamMutexDestroy(&pHandle->cfMutex);
@ -933,11 +933,11 @@ void streamBackendCleanup(void* arg) {
void streamBackendHandleCleanup(void* arg) {
SBackendCfWrapper* wrapper = arg;
bool remove = wrapper->remove;
taosThreadRwlockWrlock(&wrapper->rwLock);
(void)taosThreadRwlockWrlock(&wrapper->rwLock);
stDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
if (wrapper->rocksdb == NULL) {
taosThreadRwlockUnlock(&wrapper->rwLock);
(void)taosThreadRwlockUnlock(&wrapper->rwLock);
return;
}
@ -988,9 +988,9 @@ void streamBackendHandleCleanup(void* arg) {
wrapper->readOpts = NULL;
taosMemoryFreeClear(wrapper->cfOpts);
taosMemoryFreeClear(wrapper->param);
taosThreadRwlockUnlock(&wrapper->rwLock);
(void)taosThreadRwlockUnlock(&wrapper->rwLock);
taosThreadRwlockDestroy(&wrapper->rwLock);
(void)taosThreadRwlockDestroy(&wrapper->rwLock);
wrapper->rocksdb = NULL;
// taosReleaseRef(streamBackendId, wrapper->backendId);
@ -1083,12 +1083,20 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
STaskDbWrapper* pBackend = arg;
taosThreadRwlockWrlock(&pBackend->chkpDirLock);
(void)taosThreadRwlockWrlock(&pBackend->chkpDirLock);
taosArrayPush(pBackend->chkpSaved, &chkpId);
(void)taosArrayPush(pBackend->chkpSaved, &chkpId);
SArray* chkpDel = taosArrayInit(8, sizeof(int64_t));
if (chkpDel == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SArray* chkpDup = taosArrayInit(8, sizeof(int64_t));
if (chkpDup == NULL) {
taosArrayDestroy(chkpDel);
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t firsId = 0;
if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
@ -1097,9 +1105,9 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
if (id >= firsId) {
taosArrayPush(chkpDup, &id);
(void)taosArrayPush(chkpDup, &id);
} else {
taosArrayPush(chkpDel, &id);
(void)taosArrayPush(chkpDel, &id);
}
}
} else {
@ -1108,11 +1116,11 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
taosArrayPush(chkpDel, &id);
(void)taosArrayPush(chkpDel, &id);
}
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
taosArrayPush(chkpDup, &id);
(void)taosArrayPush(chkpDup, &id);
}
}
taosArrayDestroy(pBackend->chkpSaved);
@ -1263,7 +1271,7 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
if (ret == 1) {
taosArrayPush(pBackend->chkpSaved, &checkpointId);
(void)taosArrayPush(pBackend->chkpSaved, &checkpointId);
}
} else {
continue;
@ -1272,7 +1280,7 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
taosArraySort(pBackend->chkpSaved, chkpIdComp);
taosMemoryFree(pChkpDir);
taosCloseDir(&pDir);
(void)taosCloseDir(&pDir);
return 0;
}
@ -1281,7 +1289,7 @@ int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_ha
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (pBackend->pCf[i]) {
rocksdb_column_family_handle_t* p = pBackend->pCf[i];
taosArrayPush(pHandle, &p);
(void)taosArrayPush(pHandle, &p);
}
}
int32_t nCf = taosArrayGetSize(pHandle);
@ -1613,7 +1621,7 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
code = 0;
_EXIT:
taosCloseFile(&pFile);
(void)taosCloseFile(&pFile);
taosMemoryFree(pDst);
return code;
}
@ -1671,7 +1679,7 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) {
goto _EXIT;
}
atomic_store_64(&pTaskDb->dataWritten, 0);
(void)atomic_store_64(&pTaskDb->dataWritten, 0);
pTaskDb->chkpId = chkpId;
_EXIT:
@ -1679,13 +1687,13 @@ _EXIT:
// clear checkpoint dir if failed
if (code != 0 && pChkpDir != NULL) {
if (taosDirExist(pChkpIdDir)) {
taosRemoveDir(pChkpIdDir);
(void)taosRemoveDir(pChkpIdDir);
}
}
taosMemoryFree(pChkpIdDir);
taosMemoryFree(pChkpDir);
taosReleaseRef(taskDbWrapperId, refId);
(void)taosReleaseRef(taskDbWrapperId, refId);
taosMemoryFree(ppCf);
return code;
}
@ -1758,7 +1766,7 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
}
int streamStateValueIsStale(char* v) {
int64_t ts = 0;
taosDecodeFixedI64(v, &ts);
(void)taosDecodeFixedI64(v, &ts);
return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
}
int iterValueIsStale(rocksdb_iterator_t* iter) {
@ -1801,8 +1809,8 @@ int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
p1 = taosDecodeFixedI64(p1, &key1.key.ts);
p2 = taosDecodeFixedI64(p2, &key2.key.ts);
taosDecodeFixedI64(p1, &key1.opNum);
taosDecodeFixedI64(p2, &key2.opNum);
(void)taosDecodeFixedI64(p1, &key1.opNum);
(void)taosDecodeFixedI64(p2, &key2.opNum);
return stateKeyCmpr(&key1, sizeof(key1), &key2, sizeof(key2));
}
@ -1998,8 +2006,8 @@ int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, s
char* p1 = (char*)aBuf;
char* p2 = (char*)bBuf;
taosDecodeFixedI64(p1, &w1);
taosDecodeFixedI64(p2, &w2);
(void)taosDecodeFixedI64(p1, &w1);
(void)taosDecodeFixedI64(p2, &w2);
if (w1 == w2) {
return 0;
} else {
@ -2320,7 +2328,7 @@ void taskDbRemoveRef(void* pTaskDb) {
}
STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskDbWrapperId, pBackend->refId);
(void)taosReleaseRef(taskDbWrapperId, pBackend->refId);
}
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
@ -2386,22 +2394,22 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->chkpId = -1;
pTaskDb->chkpCap = 4;
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
taskDbLoadChkpInfo(pTaskDb);
(void)taskDbLoadChkpInfo(pTaskDb);
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
(void)taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
}
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
taosArrayPush(pTaskDb->chkpInUse, &chkp);
(void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
(void)taosArrayPush(pTaskDb->chkpInUse, &chkp);
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
(void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
(void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
for (int i = 0; i < size; i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
@ -2410,13 +2418,13 @@ void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
break;
}
}
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
(void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
taosArrayDestroy(pTaskDb->chkpSaved);
taosArrayDestroy(pTaskDb->chkpInUse);
taosThreadRwlockDestroy(&pTaskDb->chkpDirLock);
(void)taosThreadRwlockDestroy(&pTaskDb->chkpDirLock);
}
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
@ -2462,9 +2470,9 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
STaskDbWrapper* p = pTaskDb;
streamMutexLock(&p->mutex);
(void)streamMutexLock(&p->mutex);
p->chkpId = chkpId;
streamMutexUnlock(&p->mutex);
(void)streamMutexUnlock(&p->mutex);
}
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
@ -2476,7 +2484,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
pTaskDb->idstr = key ? taosStrdup(key) : NULL;
pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
taosThreadMutexInit(&pTaskDb->mutex, NULL);
(void)taosThreadMutexInit(&pTaskDb->mutex, NULL);
taskDbInitChkpOpt(pTaskDb);
taskDbInitOpt(pTaskDb);
@ -2650,7 +2658,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
taosReleaseRef(taskDbWrapperId, refId);
(void)taosReleaseRef(taskDbWrapperId, refId);
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -2658,7 +2666,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(buf);
taosReleaseRef(taskDbWrapperId, refId);
(void)taosReleaseRef(taskDbWrapperId, refId);
return TSDB_CODE_OUT_OF_RANGE;
}
@ -2669,7 +2677,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
taosMemoryFree(buf);
}
taosReleaseRef(taskDbWrapperId, refId);
(void)taosReleaseRef(taskDbWrapperId, refId);
return code;
}
@ -2906,7 +2914,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->dbOpt = handle->dbOpt;
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
(void)taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else {
inst = *pInst;
}
@ -3146,9 +3154,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1); \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
@ -3180,7 +3188,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
@ -3223,9 +3231,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
break; \
} \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1); \
char toString[128] = {0}; \
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (stDebugFlag & DEBUG_TRACE) (void)(ginitDict[i].toStrFunc((void*)key, toString)); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
@ -3264,7 +3272,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
stDebug("streamStateClear_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1);
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
@ -3280,8 +3288,8 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
if (err != NULL) {
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
stateKeyToString(&sKey, toStringStart);
stateKeyToString(&eKey, toStringEnd);
(void)stateKeyToString(&sKey, toStringStart);
(void)stateKeyToString(&eKey, toStringEnd);
stWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
@ -3298,15 +3306,21 @@ void streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
}
}
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
int code = 0;
stDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
code = streamStatePut_rocksdb(pState, &tmp, NULL, 0);
if (code != 0) {
return code;
}
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
if (code != 0) {
return code;
}
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
return code;
return streamStateDel_rocksdb(pState, &tmp);
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
@ -3335,6 +3349,9 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
return 0;
}
*pVal = taosMemoryMalloc(size);
if (*pVal == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(*pVal, 0, size);
return 0;
}
@ -3351,7 +3368,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr);
(void)stateKeyDecode((void*)pKtmp, keyStr);
if (pKtmp->opNum != pCur->number) {
return -1;
}
@ -3404,7 +3421,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStateKey curKey;
size_t kLen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
(void)stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
@ -3426,7 +3443,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
{
char tbuf[256] = {0};
stateKeyToString((void*)&maxStateKey, tbuf);
(void)stateKeyToString((void*)&maxStateKey, tbuf);
stDebug("seek to last:%s", tbuf);
}
@ -3476,7 +3493,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
SStateKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
(void)stateKeyDecode((void*)&curKey, keyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
pCur->number = pState->number;
@ -3624,7 +3641,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
(void)stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
@ -3661,7 +3678,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
(void)stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
rocksdb_iter_next(pCur->iter);
@ -3701,7 +3718,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
(void)stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter);
@ -3741,7 +3758,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
(void)stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) return pCur;
rocksdb_iter_prev(pCur->iter);
@ -3764,7 +3781,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
return -1;
}
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
(void)stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
if (pVal != NULL) *pVal = NULL;
if (pVLen != NULL) *pVLen = 0;
@ -3843,7 +3860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
(void)winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur;
}
@ -3863,7 +3880,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
}
size_t klen, vlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
winKeyDecode(&winKey, keyStr);
(void)winKeyDecode(&winKey, keyStr);
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
@ -3904,7 +3921,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
SWinKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
(void)winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) < 0) {
return pCur;
}
@ -3941,7 +3958,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
SWinKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
(void)winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
@ -4024,11 +4041,12 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
if (pCur == NULL) {
}
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
@ -4214,7 +4232,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
(void)taosArrayPush(result, &checkPoint);
}
} else {
break;
@ -4306,7 +4324,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
{
char tbuf[256] = {0};
ginitDict[i].toStrFunc((void*)key, tbuf);
(void)(ginitDict[i].toStrFunc((void*)key, tbuf));
stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
}
return 0;
@ -4321,7 +4339,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
atomic_add_fetch_64(&wrapper->dataWritten, 1);
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1);
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
@ -4429,8 +4447,8 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
if (fname == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strncpy(fname, name, len);
taosArrayPush(diff, &fname);
(void)strncpy(fname, name, len);
(void)taosArrayPush(diff, &fname);
}
pIter = taosHashIterate(p2, pIter);
}
@ -4506,7 +4524,7 @@ void dbChkpDebugInfo(SDbChkp* pDb) {
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
int32_t code = 0;
int32_t nBytes;
taosThreadRwlockWrlock(&p->rwLock);
(void)taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
@ -4524,7 +4542,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
nBytes =
snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
if (nBytes <= 0 || nBytes >= p->len) {
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return TSDB_CODE_OUT_OF_RANGE;
}
@ -4534,7 +4552,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
TdDirPtr pDir = taosOpenDir(p->buf);
if (pDir == NULL) {
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return TAOS_SYSTEM_ERROR(errno);
}
@ -4570,9 +4588,9 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
continue;
}
}
taosCloseDir(&pDir);
(void)taosCloseDir(&pDir);
if (code != 0) {
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return code;
}
@ -4584,12 +4602,12 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
if (name != NULL && !isBkdDataMeta(name, len)) {
char* fname = taosMemoryCalloc(1, len + 1);
if (fname == NULL) {
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return TSDB_CODE_OUT_OF_MEMORY;
}
strncpy(fname, name, len);
taosArrayPush(p->pAdd, &fname);
(void)strncpy(fname, name, len);
(void)taosArrayPush(p->pAdd, &fname);
}
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
@ -4621,7 +4639,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
p->idx = 1 - p->idx;
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return code;
}
@ -4679,7 +4697,7 @@ int32_t dbChkpCreate(char* path, int64_t initChkpId, SDbChkp** ppChkp) {
}
p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL);
(void)taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL;
code = dbChkpGetDelta(p, initChkpId, list);
@ -4793,7 +4811,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;
}
taosArrayPush(list, &p);
(void)taosArrayPush(list, &p);
}
// copy current file to dst dir
@ -4859,7 +4877,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
if (nBytes <= 0 || nBytes >= sizeof(content)) {
code = TSDB_CODE_OUT_OF_RANGE;
stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
taosCloseFile(&pFile);
(void)taosCloseFile(&pFile);
goto _ERROR;
}
@ -4867,10 +4885,10 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
if (nBytes != strlen(content)) {
code = TAOS_SYSTEM_ERROR(errno);
stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code));
taosCloseFile(&pFile);
(void)taosCloseFile(&pFile);
goto _ERROR;
}
taosCloseFile(&pFile);
(void)taosCloseFile(&pFile);
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);
@ -4879,7 +4897,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
_ERROR:
taosMemoryFree(buffer);
taosThreadRwlockUnlock(&p->rwLock);
(void)taosThreadRwlockUnlock(&p->rwLock);
return code;
}
@ -4925,7 +4943,7 @@ void bkdMgtDestroy(SBkdMgt* bm) {
pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
}
taosThreadRwlockDestroy(&bm->rwLock);
(void)taosThreadRwlockDestroy(&bm->rwLock);
taosMemoryFree(bm->path);
taosHashCleanup(bm->pDbChkpTbl);
@ -4933,7 +4951,7 @@ void bkdMgtDestroy(SBkdMgt* bm) {
}
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
(void)taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL;
@ -4941,14 +4959,14 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t cap = strlen(bm->path) + 64;
char* path = taosMemoryCalloc(1, cap);
if (path == NULL) {
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
code = TSDB_CODE_OUT_OF_RANGE;
return code;
}
@ -4957,20 +4975,20 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
code = dbChkpCreate(path, chkpId, &p);
if (code != 0) {
taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
dbChkpDestroy(p);
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
code = terrno;
return code;
}
pChkp = p;
code = dbChkpDumpTo(pChkp, dname, list);
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
return code;
} else {
code = dbChkpGetDelta(pChkp, chkpId, NULL);
@ -4979,7 +4997,7 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
}
}
taosThreadRwlockUnlock(&bm->rwLock);
(void)taosThreadRwlockUnlock(&bm->rwLock);
return code;
}