From 4f66dc538fdb476c2fbf482d2801a038abf7ee57 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 28 Sep 2023 11:52:18 +0000 Subject: [PATCH] add cvt state --- source/libs/stream/inc/streamBackendRocksdb.h | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 253 +++++++++--------- 2 files changed, 127 insertions(+), 130 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a5533ef2f2..2c5eeb1fbe 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -64,8 +64,8 @@ typedef struct { rocksdb_compactionfilterfactory_t* filterFactory; TdThreadMutex mutex; - - int64_t refId; + char* idstr; + int64_t refId; } STaskBackendWrapper; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 76b9b7474c..80b385d072 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2000,26 +2000,25 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { } } if (pState != NULL && idx != -1) { - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_column_family_handle_t* cf = NULL; - taosThreadRwlockWrlock(&wrapper->rwLock); - cf = wrapper->pHandle[idx]; - if (cf == NULL) { - char buf[128] = {0}; - GEN_COLUMN_FAMILY_NAME(buf, wrapper->idstr, ginitDict[idx].key); - char* err = NULL; - cf = rocksdb_create_column_family(wrapper->rocksdb, wrapper->cfOpts[idx], buf, &err); + taosThreadMutexLock(&wrapper->mutex); + + cf = wrapper->pCf[idx]; + if (cf == NULL) { + char* err = NULL; + cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err); if (err != NULL) { idx = -1; qError("failed to to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err); taosMemoryFree(err); } else { qDebug("succ to to open cf, %p %s_%s", pState, wrapper->idstr, funcName); - wrapper->pHandle[idx] = cf; + wrapper->pCf[idx] = cf; } } - taosThreadRwlockUnlock(&wrapper->rwLock); + taosThreadMutexUnlock(&wrapper->mutex); } return idx; @@ -2040,15 +2039,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe *readOpt = rocksdb_readoptions_create(); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (snapshot != NULL) { - *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb); + *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db); rocksdb_readoptions_set_snapshot(*readOpt, *snapshot); rocksdb_readoptions_set_fill_cache(*readOpt, 0); } - return rocksdb_create_iterator_cf(wrapper->rocksdb, *readOpt, - ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); + return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -2062,13 +2060,13 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe code = -1; \ break; \ } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->db; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ char* ttlV = NULL; \ int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ @@ -2082,75 +2080,75 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_readoptions_t* opts = wrapper->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL || len == 0) { \ - if (err == NULL) { \ - qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ - } else { \ - qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ - taosMemoryFreeClear(err); \ - } \ - code = -1; \ - } else { \ - char* p = NULL; \ - int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (tlen <= 0) { \ - qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = tlen; \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->db; \ + rocksdb_readoptions_t* opts = wrapper->readOpt; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL || len == 0) { \ + if (err == NULL) { \ + qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + } else { \ + qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ + qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = tlen; \ + } \ } while (0); -#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ - code = -1; \ - break; \ - } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ - rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ - if (err != NULL) { \ - qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ - } \ +#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + code = -1; \ + break; \ + } \ + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->db; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ + rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ + if (err != NULL) { \ + qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + } \ } while (0); // state cf @@ -2176,7 +2174,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -2186,10 +2184,9 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); - if (wrapper->pHandle[1] != NULL) { + if (wrapper->pCf[1] != NULL) { char* err = NULL; - rocksdb_delete_range_cf(wrapper->rocksdb, wrapper->writeOpts, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen, - &err); + rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err); if (err != NULL) { char toStringStart[128] = {0}; char toStringEnd[128] = {0}; @@ -2199,7 +2196,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); } else { - rocksdb_compact_range_cf(wrapper->rocksdb, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen); + rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen); } } @@ -2300,9 +2297,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin if (pCur == NULL) { return NULL; } - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; pCur->number = pState->number; - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -2356,7 +2353,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; pCur->number = pState->number; - pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb; + pCur->db = ((STaskBackendWrapper*)pState->pTdbState->pOwner->pBackend)->db; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -2467,13 +2464,13 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } pCur->number = pState->number; - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -2507,12 +2504,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -2545,12 +2542,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -2647,12 +2644,12 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (pCur == NULL) return NULL; - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -2707,13 +2704,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; } - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -2745,13 +2742,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -2783,12 +2780,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; } - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -3015,7 +3012,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co int code = 0; char* err = NULL; - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_snapshot_t* snapshot = NULL; rocksdb_readoptions_t* readopts = NULL; rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); @@ -3048,16 +3045,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co } rocksdb_iter_next(pIter); } - rocksdb_release_snapshot(wrapper->rocksdb, snapshot); + rocksdb_release_snapshot(wrapper->db, snapshot); rocksdb_readoptions_destroy(readopts); rocksdb_iter_destroy(pIter); return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -3106,7 +3103,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { @@ -3120,7 +3117,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb char* ttlV = NULL; int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); - rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx]; + rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); @@ -3139,8 +3136,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx]; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); if (tmpBuf == NULL) { @@ -3155,15 +3152,15 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb return 0; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { - char* err = NULL; - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + char* err = NULL; + STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { qError("streamState failed to write batch, err:%s", err); taosMemoryFree(err); return -1; } else { - qDebug("write batch to backend:%p", wrapper->pBackend); + qDebug("write batch to backend:%p", wrapper->db); } return 0; }