add cvt state

This commit is contained in:
yihaoDeng 2023-09-28 11:52:18 +00:00
parent c5bb37a5cb
commit 4f66dc538f
2 changed files with 127 additions and 130 deletions

View File

@ -64,8 +64,8 @@ typedef struct {
rocksdb_compactionfilterfactory_t* filterFactory;
TdThreadMutex mutex;
int64_t refId;
char* idstr;
int64_t refId;
} STaskBackendWrapper;

View File

@ -2000,26 +2000,25 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
}
}
if (pState != NULL && idx != -1) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_column_family_handle_t* cf = NULL;
taosThreadRwlockWrlock(&wrapper->rwLock);
cf = wrapper->pHandle[idx];
if (cf == NULL) {
char buf[128] = {0};
GEN_COLUMN_FAMILY_NAME(buf, wrapper->idstr, ginitDict[idx].key);
char* err = NULL;
cf = rocksdb_create_column_family(wrapper->rocksdb, wrapper->cfOpts[idx], buf, &err);
taosThreadMutexLock(&wrapper->mutex);
cf = wrapper->pCf[idx];
if (cf == NULL) {
char* err = NULL;
cf = rocksdb_create_column_family(wrapper->db, wrapper->pCfOpts[idx], ginitDict[idx].key, &err);
if (err != NULL) {
idx = -1;
qError("failed to to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
taosMemoryFree(err);
} else {
qDebug("succ to to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
wrapper->pHandle[idx] = cf;
wrapper->pCf[idx] = cf;
}
}
taosThreadRwlockUnlock(&wrapper->rwLock);
taosThreadMutexUnlock(&wrapper->mutex);
}
return idx;
@ -2040,15 +2039,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
*readOpt = rocksdb_readoptions_create();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb);
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(*readOpt, 0);
}
return rocksdb_create_iterator_cf(wrapper->rocksdb, *readOpt,
((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
@ -2062,13 +2060,13 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
@ -2082,75 +2080,75 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_readoptions_t* opts = wrapper->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
} else { \
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
} \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
rocksdb_readoptions_t* opts = wrapper->readOpt; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
} else { \
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
} \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->db; \
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \
} while (0);
// state cf
@ -2176,7 +2174,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
@ -2186,10 +2184,9 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr);
if (wrapper->pHandle[1] != NULL) {
if (wrapper->pCf[1] != NULL) {
char* err = NULL;
rocksdb_delete_range_cf(wrapper->rocksdb, wrapper->writeOpts, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen,
&err);
rocksdb_delete_range_cf(wrapper->db, wrapper->writeOpt, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen, &err);
if (err != NULL) {
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
@ -2199,7 +2196,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
} else {
rocksdb_compact_range_cf(wrapper->rocksdb, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen);
rocksdb_compact_range_cf(wrapper->db, wrapper->pCf[1], sKeyStr, sLen, eKeyStr, eLen);
}
}
@ -2300,9 +2297,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
if (pCur == NULL) {
return NULL;
}
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
@ -2356,7 +2353,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
pCur->number = pState->number;
pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb;
pCur->db = ((STaskBackendWrapper*)pState->pTdbState->pOwner->pBackend)->db;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
@ -2467,13 +2464,13 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
@ -2507,12 +2504,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -2545,12 +2542,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -2647,12 +2644,12 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -2707,13 +2704,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) {
return NULL;
}
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -2745,13 +2742,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
}
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -2783,12 +2780,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return -1;
}
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -3015,7 +3012,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
int code = 0;
char* err = NULL;
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
@ -3048,16 +3045,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(wrapper->rocksdb, snapshot);
rocksdb_release_snapshot(wrapper->db, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->db = wrapper->rocksdb;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
@ -3106,7 +3103,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) {
@ -3120,7 +3117,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx];
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
@ -3139,8 +3136,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx];
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
if (tmpBuf == NULL) {
@ -3155,15 +3152,15 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
char* err = NULL;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err);
return -1;
} else {
qDebug("write batch to backend:%p", wrapper->pBackend);
qDebug("write batch to backend:%p", wrapper->db);
}
return 0;
}