refactor code

This commit is contained in:
yihaoDeng 2023-08-29 09:52:03 +08:00
parent a2901411b0
commit 8ed2836a2b
4 changed files with 8 additions and 8 deletions

View File

@ -139,7 +139,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamBackendTriggerChkp(void* pMeta, char* dst); int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId); int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);

View File

@ -909,7 +909,7 @@ _ERROR:
taosArrayDestroy(refs); taosArrayDestroy(refs);
return code; return code;
} }
int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId) { int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
if (arg == NULL) return 0; if (arg == NULL) return 0;
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;

View File

@ -49,11 +49,6 @@ void metaRefMgtInit();
void metaRefMgtCleanup(); void metaRefMgtCleanup();
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
void metaRefMgtInit() {
taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
}
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
@ -72,6 +67,11 @@ void streamMetaCleanup() {
metaRefMgtCleanup(); metaRefMgtCleanup();
} }
void metaRefMgtInit() {
taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
}
void metaRefMgtCleanup() { void metaRefMgtCleanup() {
void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL); void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
while (pIter) { while (pIter) {

View File

@ -123,7 +123,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
if (taosIsDir(tdir)) { if (taosIsDir(tdir)) {
validChkp = 1; validChkp = 1;
qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkpPos(pMeta, chkpId); streamBackendAddInUseChkp(pMeta, chkpId);
} else { } else {
qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
} }