support fill history

This commit is contained in:
yihaoDeng 2023-06-16 09:32:38 +08:00
parent 8cb8c05428
commit 0ef8afbdaa
6 changed files with 106 additions and 106 deletions

View File

@ -130,7 +130,7 @@ typedef struct SSerializeDataHandle {
// incremental state storage // incremental state storage
typedef struct SBackendWrapper { typedef struct SBackendCfWrapper {
void *rocksdb; void *rocksdb;
void **pHandle; void **pHandle;
void *writeOpts; void *writeOpts;
@ -146,11 +146,11 @@ typedef struct SBackendWrapper {
bool remove; bool remove;
int64_t backendId; int64_t backendId;
char idstr[64]; char idstr[64];
} SBackendWrapper; } SBackendCfWrapper;
typedef struct STdbState { typedef struct STdbState {
SBackendWrapper *pBackendWrapper; SBackendCfWrapper *pBackendCfWrapper;
int64_t backendWrapperId; int64_t backendCfWrapperId;
char idstr[64]; char idstr[64];
struct SStreamTask *pOwner; struct SStreamTask *pOwner;
void *db; void *db;

View File

@ -42,7 +42,7 @@ typedef struct {
TdThreadMutex cfMutex; TdThreadMutex cfMutex;
SHashObj* cfInst; SHashObj* cfInst;
int64_t defaultCfInit; int64_t defaultCfInit;
} SBackendHandle; } SBackendWrapper;
void* streamBackendInit(const char* path); void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);

View File

@ -55,7 +55,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendWrapperId; extern int32_t streamBackendCfWrapperId;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -145,7 +145,7 @@ SCfInit ginitDict[] = {
void* streamBackendInit(const char* path) { void* streamBackendInit(const char* path) {
qDebug("start to init stream backend at %s", path); qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
pHandle->list = tdListNew(sizeof(SCfComparator)); pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL);
@ -212,8 +212,8 @@ _EXIT:
return NULL; return NULL;
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)arg; SBackendWrapper* pHandle = (SBackendWrapper*)arg;
RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL);
while (pIter != NULL) { while (pIter != NULL) {
RocksdbCfInst* inst = *pIter; RocksdbCfInst* inst = *pIter;
destroyRocksdbCfInst(inst); destroyRocksdbCfInst(inst);
@ -253,8 +253,8 @@ void streamBackendCleanup(void* arg) {
return; return;
} }
void streamBackendHandleCleanup(void* arg) { void streamBackendHandleCleanup(void* arg) {
SBackendWrapper* wrapper = arg; SBackendCfWrapper* wrapper = arg;
bool remove = wrapper->remove;
qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
if (wrapper->rocksdb == NULL) { if (wrapper->rocksdb == NULL) {
return; return;
@ -263,7 +263,7 @@ void streamBackendHandleCleanup(void* arg) {
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
char* err = NULL; char* err = NULL;
if (wrapper->remove) { if (remove) {
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL) if (wrapper->pHandle[i] != NULL)
rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err); rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err);
@ -295,7 +295,7 @@ void streamBackendHandleCleanup(void* arg) {
rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt); rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
} }
if (wrapper->remove) { if (remove) {
streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode); streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
} }
rocksdb_writeoptions_destroy(wrapper->writeOpts); rocksdb_writeoptions_destroy(wrapper->writeOpts);
@ -315,16 +315,16 @@ void streamBackendHandleCleanup(void* arg) {
return; return;
} }
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend; SBackendWrapper* pHandle = (SBackendWrapper*)backend;
SListNode* node = NULL; SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex); taosThreadMutexLock(&pHandle->mutex);
node = tdListAdd(pHandle->list, arg); node = tdListAdd(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex); taosThreadMutexUnlock(&pHandle->mutex);
return node; return node;
} }
void streamBackendDelCompare(void* backend, void* arg) { void streamBackendDelCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend; SBackendWrapper* pHandle = (SBackendWrapper*)backend;
SListNode* node = NULL; SListNode* node = NULL;
taosThreadMutexLock(&pHandle->mutex); taosThreadMutexLock(&pHandle->mutex);
node = tdListPopNode(pHandle->list, arg); node = tdListPopNode(pHandle->list, arg);
taosThreadMutexUnlock(&pHandle->mutex); taosThreadMutexUnlock(&pHandle->mutex);
@ -784,11 +784,11 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
} }
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
SBackendHandle* handle = backend; SBackendWrapper* handle = backend;
char* err = NULL; char* err = NULL;
int64_t streamId; int64_t streamId;
int32_t taskId, dummy = 0; int32_t taskId, dummy = 0;
char suffix[64] = {0}; char suffix[64] = {0};
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
@ -908,30 +908,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid); taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend; SBackendWrapper* handle = backend;
SBackendWrapper* pBackendWrapper = taosMemoryCalloc(1, sizeof(SBackendWrapper)); SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
taosThreadMutexLock(&handle->cfMutex); taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst; RocksdbCfInst* inst = *ppInst;
pBackendWrapper->rocksdb = inst->db; pBackendCfWrapper->rocksdb = inst->db;
pBackendWrapper->pHandle = (void**)inst->pHandle; pBackendCfWrapper->pHandle = (void**)inst->pHandle;
pBackendWrapper->writeOpts = inst->wOpt; pBackendCfWrapper->writeOpts = inst->wOpt;
pBackendWrapper->readOpts = inst->rOpt; pBackendCfWrapper->readOpts = inst->rOpt;
pBackendWrapper->cfOpts = (void**)(inst->cfOpt); pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt);
pBackendWrapper->dbOpt = handle->dbOpt; pBackendCfWrapper->dbOpt = handle->dbOpt;
pBackendWrapper->param = inst->param; pBackendCfWrapper->param = inst->param;
pBackendWrapper->pBackend = handle; pBackendCfWrapper->pBackend = handle;
pBackendWrapper->pComparNode = inst->pCompareNode; pBackendCfWrapper->pComparNode = inst->pCompareNode;
taosThreadMutexUnlock(&handle->cfMutex); taosThreadMutexUnlock(&handle->cfMutex);
pBackendWrapper->backendId = pState->streamBackendRid; pBackendCfWrapper->backendId = pState->streamBackendRid;
memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
pState->pTdbState->backendWrapperId = id; pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendWrapper = pBackendWrapper; pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
return 0; return 0;
} }
taosThreadMutexUnlock(&handle->cfMutex); taosThreadMutexUnlock(&handle->cfMutex);
@ -964,31 +964,31 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pCompare[i] = compare; pCompare[i] = compare;
} }
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
pBackendWrapper->rocksdb = handle->db; pBackendCfWrapper->rocksdb = handle->db;
pBackendWrapper->pHandle = (void**)cfHandle; pBackendCfWrapper->pHandle = (void**)cfHandle;
pBackendWrapper->writeOpts = rocksdb_writeoptions_create(); pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create();
pBackendWrapper->readOpts = rocksdb_readoptions_create(); pBackendCfWrapper->readOpts = rocksdb_readoptions_create();
pBackendWrapper->cfOpts = (void**)cfOpt; pBackendCfWrapper->cfOpts = (void**)cfOpt;
pBackendWrapper->dbOpt = handle->dbOpt; pBackendCfWrapper->dbOpt = handle->dbOpt;
pBackendWrapper->param = param; pBackendCfWrapper->param = param;
pBackendWrapper->pBackend = handle; pBackendCfWrapper->pBackend = handle;
pBackendWrapper->backendId = pState->streamBackendRid; pBackendCfWrapper->backendId = pState->streamBackendRid;
taosThreadRwlockInit(&pBackendWrapper->rwLock, NULL); taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pBackendWrapper->pComparNode = streamBackendAddCompare(handle, &compare); pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pBackendWrapper->writeOpts, 1); rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
pState->pTdbState->backendWrapperId = id; pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendWrapper = pBackendWrapper; pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); qInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
return 0; return 0;
} }
void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateCloseBackend(SStreamState* pState, bool remove) {
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SBackendHandle* pHandle = wrapper->pBackend; SBackendWrapper* pHandle = wrapper->pBackend;
taosThreadMutexLock(&pHandle->cfMutex); taosThreadMutexLock(&pHandle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
@ -1001,7 +1001,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
wrapper->idstr); wrapper->idstr);
wrapper->remove |= remove; // update by other pState wrapper->remove |= remove; // update by other pState
taosReleaseRef(streamBackendWrapperId, pState->pTdbState->backendWrapperId); taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
} }
void streamStateDestroyCompar(void* arg) { void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg; SCfComparator* comp = (SCfComparator*)arg;
@ -1020,7 +1020,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
break; break;
} }
} }
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (pState != NULL && idx != -1) { if (pState != NULL && idx != -1) {
rocksdb_column_family_handle_t* cf = NULL; rocksdb_column_family_handle_t* cf = NULL;
taosThreadRwlockRdlock(&wrapper->rwLock); taosThreadRwlockRdlock(&wrapper->rwLock);
@ -1060,7 +1060,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t** readOpt) { rocksdb_readoptions_t** readOpt) {
int idx = streamStateGetCfIdx(pState, cfName); int idx = streamStateGetCfIdx(pState, cfName);
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb);
} }
@ -1084,8 +1084,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
@ -1115,8 +1115,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
@ -1159,8 +1159,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
@ -1199,11 +1199,11 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear_rocksdb(SStreamState* pState) { int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
char sKeyStr[128] = {0}; char sKeyStr[128] = {0};
char eKeyStr[128] = {0}; char eKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
int sLen = stateKeyEncode(&sKey, sKeyStr); int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr);
@ -1315,7 +1315,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
@ -1350,8 +1350,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
int32_t code = 0; int32_t code = 0;
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
@ -1379,8 +1379,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
@ -1472,8 +1472,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -1514,8 +1514,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -1552,8 +1552,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb"); qDebug("streamStateSessionSeekKeyNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -1646,8 +1646,8 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb"); qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
@ -1706,8 +1706,8 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb"); qDebug("streamStateFillSeekKeyNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) { if (!pCur) {
return NULL; return NULL;
} }
@ -1743,8 +1743,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
} }
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb"); qDebug("streamStateFillSeekKeyPrev_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
@ -1780,8 +1780,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
} }
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb"); qDebug("streamStateSessionGetKeyByRange_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
} }
@ -2017,7 +2017,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
int code = 0; int code = 0;
char* err = NULL; char* err = NULL;
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_snapshot_t* snapshot = NULL; rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL; rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
@ -2056,8 +2056,8 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code; return code;
} }
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
@ -2106,8 +2106,8 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) { void* val, int32_t vlen, int64_t ttl) {
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
int i = streamStateGetCfIdx(pState, cfName); int i = streamStateGetCfIdx(pState, cfName);
if (i < 0) { if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName); qError("streamState failed to put to cf name:%s", cfName);
@ -2130,7 +2130,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* ttlV = tmpBuf; char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx]; rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
@ -2141,8 +2141,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
return 0; return 0;
} }
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL; char* err = NULL;
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) { if (err != NULL) {
qError("streamState failed to write batch, err:%s", err); qError("streamState failed to write batch, err:%s", err);

View File

@ -21,16 +21,16 @@
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
int32_t streamBackendWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendWrapperId = taosOpenRef(64, streamBackendHandleCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
} }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { void streamMetaCleanup() {
taosCloseRef(streamBackendId); taosCloseRef(streamBackendId);
taosCloseRef(streamBackendWrapperId); taosCloseRef(streamBackendCfWrapperId);
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {

View File

@ -134,11 +134,11 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
return NULL; return NULL;
} }
taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
&pState->pTdbState->backendWrapperId, sizeof(pState->pTdbState->backendWrapperId)); &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
} else { } else {
int64_t id = *(int64_t*)uniqueId; int64_t id = *(int64_t*)uniqueId;
pState->pTdbState->backendWrapperId = id; pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendWrapper = taosAcquireRef(streamBackendWrapperId, id); pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
taosAcquireRef(streamBackendId, pState->streamBackendRid); taosAcquireRef(streamBackendId, pState->streamBackendRid);
} }