add stream backend copy

This commit is contained in:
yihaoDeng 2023-12-05 17:09:03 +08:00
parent 4e68fbd744
commit 87c78919a9
8 changed files with 79 additions and 63 deletions

View File

@ -38,8 +38,8 @@ extern "C" {
#define META_READER_NOLOCK 0x1 #define META_READER_NOLOCK 0x1
#define STREAM_STATE_BUFF_HASH 1 #define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2 #define STREAM_STATE_BUFF_SORT 2
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*); typedef TSKEY (*GetTsFun)(void*);
@ -102,14 +102,14 @@ typedef struct SMTbCursor {
} SMTbCursor; } SMTbCursor;
typedef struct SMCtbCursor { typedef struct SMCtbCursor {
SMeta *pMeta; SMeta* pMeta;
void *pCur; void* pCur;
tb_uid_t suid; tb_uid_t suid;
void *pKey; void* pKey;
void *pVal; void* pVal;
int kLen; int kLen;
int vLen; int vLen;
int8_t paused; int8_t paused;
int lock; int lock;
} SMCtbCursor; } SMCtbCursor;
@ -248,22 +248,23 @@ typedef struct SStoreMeta {
void* (*storeGetIndexInfo)(); void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode); void* (*getInvertIndex)(void* pVnode);
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list);
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
void* storeGetVersionRange; void* storeGetVersionRange;
void* storeGetLastTimestamp; void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables);
int64_t (*getNumOfRowsInMem)(void* pVnode); int64_t (*getNumOfRowsInMem)(void* pVnode);
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur); void (*closeCtbCursor)(SMCtbCursor* pCtbCur);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta; } SStoreMeta;
typedef struct SStoreMetaReader { typedef struct SStoreMetaReader {
@ -348,14 +349,14 @@ typedef struct SStateStore {
const SSessionKey* pKey, void** pVal, int32_t* pVLen); const SSessionKey* pKey, void** pVal, int32_t* pVLen);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
bool (*isIncrementalTimeStamp)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
@ -382,6 +383,7 @@ typedef struct SStateStore {
void (*streamStateDestroy)(SStreamState* pState, bool remove); void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
} SStateStore; } SStateStore;
typedef struct SStorageAPI { typedef struct SStorageAPI {

View File

@ -50,7 +50,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
//session window // session window
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
@ -65,7 +65,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
//state window // state window
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
@ -96,6 +96,9 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char*
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts); void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
void streamStateCopyBackend(SStreamState* src, SStreamState* dst);
SStreamStateCur* createStreamStateCursor(); SStreamStateCur* createStreamStateCursor();
/***compare func **/ /***compare func **/

View File

@ -66,6 +66,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
} else { } else {
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
} }
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = { SReadHandle handle = {

View File

@ -14,8 +14,8 @@
*/ */
#include "storageapi.h" #include "storageapi.h"
#include "tstreamUpdate.h"
#include "streamState.h" #include "streamState.h"
#include "tstreamUpdate.h"
static void initStateStoreAPI(SStateStore* pStore); static void initStateStoreAPI(SStateStore* pStore);
static void initFunctionStateStore(SFunctionStateStore* pStore); static void initFunctionStateStore(SFunctionStateStore* pStore);
@ -100,9 +100,10 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateClose = streamStateClose; pStore->streamStateClose = streamStateClose;
pStore->streamStateBegin = streamStateBegin; pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit; pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy; pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
} }
void initFunctionStateStore(SFunctionStateStore* pStore) { void initFunctionStateStore(SFunctionStateStore* pStore) {

View File

@ -736,15 +736,15 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta); // code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId); // tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta); // streamMetaWUnLock(pMeta);
code = terrno; // code = terrno;
return code; // return code;
} // }
streamMetaInitBackend(pMeta); // streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);

View File

@ -212,6 +212,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
} }
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {

View File

@ -1465,7 +1465,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
// qError("open state %p", pInfo->pState);
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
// qError("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
@ -3341,7 +3343,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < rows; i += winRows) { for (int32_t i = 0; i < rows; i += winRows) {
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) || colDataIsNull_s(pKeyColInfo, i)) { &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) ||
colDataIsNull_s(pKeyColInfo, i)) {
i++; i++;
continue; continue;
} }

View File

@ -91,8 +91,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
} }
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
stDebug("open stream state, %s", path);
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) { if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
@ -211,7 +211,7 @@ int32_t streamStateDelTaskDb(SStreamState* pState) {
SStreamTask* pTask = pState->pTdbState->pOwner; SStreamTask* pTask = pState->pTdbState->pOwner;
taskDbRemoveRef(pTask->pBackend); taskDbRemoveRef(pTask->pBackend);
taosMemoryFree(pTask); taosMemoryFree(pTask);
return 0; return 0;
} }
void streamStateClose(SStreamState* pState, bool remove) { void streamStateClose(SStreamState* pState, bool remove) {
SStreamTask* pTask = pState->pTdbState->pOwner; SStreamTask* pTask = pState->pTdbState->pOwner;
@ -374,8 +374,8 @@ int32_t streamStateClear(SStreamState* pState) {
streamStatePut(pState, &key, NULL, 0); streamStatePut(pState, &key, NULL, 0);
while (1) { while (1) {
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key);
SWinKey delKey = {0}; SWinKey delKey = {0};
int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
if (code == 0) { if (code == 0) {
streamStateDel(pState, &delKey); streamStateDel(pState, &delKey);
@ -493,7 +493,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void**
return -1; return -1;
} }
const SStateKey* pKTmp = NULL; const SStateKey* pKTmp = NULL;
int32_t kLen; int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1; return -1;
} }
@ -513,7 +513,7 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo
return -1; return -1;
} }
const SWinKey* pKTmp = NULL; const SWinKey* pKTmp = NULL;
int32_t kLen; int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1; return -1;
} }
@ -530,7 +530,7 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
return -1; return -1;
} }
uint64_t groupId = pKey->groupId; uint64_t groupId = pKey->groupId;
int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (pKey->groupId == groupId) { if (pKey->groupId == groupId) {
return 0; return 0;
@ -555,7 +555,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
} }
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -710,7 +710,8 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
#endif #endif
} }
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur,
const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen); return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
#else #else
@ -724,9 +725,9 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
#else #else
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
SSessionKey resKey = *key; SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) { if (code == 0) {
if (key->win.skey != resKey.win.skey) { if (key->win.skey != resKey.win.skey) {
code = -1; code = -1;
@ -767,7 +768,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
} }
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -798,7 +799,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
} }
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -830,7 +831,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
} }
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -854,7 +855,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v
return -1; return -1;
} }
SStateSessionKey* pKTmp = NULL; SStateSessionKey* pKTmp = NULL;
int32_t kLen; int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
return -1; return -1;
} }
@ -874,13 +875,13 @@ int32_t streamStateSessionClear(SStreamState* pState) {
sessionWinStateClear(pState->pFileState); sessionWinStateClear(pState->pFileState);
return streamStateSessionClear_rocksdb(pState); return streamStateSessionClear_rocksdb(pState);
#else #else
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
while (1) { while (1) {
SSessionKey delKey = {0}; SSessionKey delKey = {0};
void* buf = NULL; void* buf = NULL;
int32_t size = 0; int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) { if (code == 0 && size > 0) {
memset(buf, 0, size); memset(buf, 0, size);
streamStateSessionPut(pState, &delKey, buf, size); streamStateSessionPut(pState, &delKey, buf, size);
@ -909,14 +910,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
} }
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return -1; return -1;
} }
SSessionKey resKey = *key; SSessionKey resKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey; *curKey = resKey;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
@ -952,19 +953,19 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key,
return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen); return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen);
#else #else
// todo refactor // todo refactor
int32_t res = 0; int32_t res = 0;
SSessionKey originKey = *key; SSessionKey originKey = *key;
SSessionKey searchKey = *key; SSessionKey searchKey = *key;
searchKey.win.skey = key->win.skey - gap; searchKey.win.skey = key->win.skey - gap;
searchKey.win.ekey = key->win.ekey + gap; searchKey.win.ekey = key->win.ekey + gap;
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
void* tmp = tdbRealloc(NULL, valSize); void* tmp = tdbRealloc(NULL, valSize);
if (!tmp) { if (!tmp) {
return -1; return -1;
} }
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
@ -1007,16 +1008,16 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen); return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else #else
int32_t res = 0; int32_t res = 0;
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
void* tmp = tdbRealloc(NULL, valSize); void* tmp = tdbRealloc(NULL, valSize);
if (!tmp) { if (!tmp) {
return -1; return -1;
} }
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
@ -1115,6 +1116,10 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
dst->pTdbState->pOwner = src->pTdbState->pOwner;
return;
}
SStreamStateCur* createStreamStateCursor() { SStreamStateCur* createStreamStateCursor() {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->buffIndex = -1; pCur->buffIndex = -1;