add sess iter
This commit is contained in:
parent
6adf7532a8
commit
825341d90a
|
@ -69,7 +69,7 @@ int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
|
|||
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
|
||||
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
|
||||
// func cf
|
||||
|
@ -84,6 +84,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
|
|||
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
|
||||
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
|
||||
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
|
||||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState);
|
||||
|
||||
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
|
||||
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
|
||||
|
|
|
@ -2044,7 +2044,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
|
||||
qDebug("streamStateGetCur_rocksdb");
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -2175,7 +2175,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
|
|||
return code;
|
||||
}
|
||||
|
||||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
||||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
||||
qDebug("streamStateSessionSeekToLast_rocksdb");
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -58,7 +58,7 @@ struct SStreamFileState {
|
|||
|
||||
typedef SRowBuffPos SRowBuffInfo;
|
||||
|
||||
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
|
||||
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
|
||||
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
|
||||
if (pos) {
|
||||
(*pos)->beFlushed = true;
|
||||
|
@ -67,7 +67,7 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
|
|||
}
|
||||
|
||||
int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||
size_t keyLen = pFileState->keyLen;
|
||||
size_t keyLen = pFileState->keyLen;
|
||||
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
|
||||
if (ppPos) {
|
||||
if ((*ppPos) == pPos) {
|
||||
|
@ -77,13 +77,9 @@ int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pP
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void stateHashBuffClearFn(void* pBuff) {
|
||||
tSimpleHashClear(pBuff);
|
||||
}
|
||||
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
|
||||
|
||||
void stateHashBuffCleanupFn(void* pBuff) {
|
||||
tSimpleHashCleanup(pBuff);
|
||||
}
|
||||
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
|
||||
|
||||
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
|
||||
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
|
||||
|
@ -95,7 +91,7 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data,
|
|||
|
||||
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
||||
SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
|
||||
SWinKey* pWinKey = pPos->pKey;
|
||||
SWinKey* pWinKey = pPos->pKey;
|
||||
pStateKey->key = *pWinKey;
|
||||
pStateKey->opNum = num;
|
||||
return pStateKey;
|
||||
|
@ -111,15 +107,15 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i
|
|||
|
||||
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
||||
SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
|
||||
SSessionKey* pWinKey = pPos->pKey;
|
||||
SSessionKey* pWinKey = pPos->pKey;
|
||||
pStateKey->key = *pWinKey;
|
||||
pStateKey->opNum = num;
|
||||
return pStateKey;
|
||||
}
|
||||
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
|
||||
int64_t checkpointId, int8_t type) {
|
||||
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
|
||||
int8_t type) {
|
||||
if (memSize <= 0) {
|
||||
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
|
||||
}
|
||||
|
@ -178,7 +174,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->maxTs = INT64_MIN;
|
||||
pFileState->id = taosStrdup(taskId);
|
||||
|
||||
//todo(liuyao) optimize
|
||||
// todo(liuyao) optimize
|
||||
if (type == STREAM_STATE_BUFF_HASH) {
|
||||
recoverSnapshot(pFileState, checkpointId);
|
||||
}
|
||||
|
@ -290,9 +286,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
|
|||
|
||||
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
|
||||
|
||||
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
|
||||
pPos->beUsed = used;
|
||||
}
|
||||
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
|
||||
|
||||
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
|
||||
uint64_t i = 0;
|
||||
|
@ -357,8 +351,8 @@ int32_t clearRowBuff(SStreamFileState* pFileState) {
|
|||
}
|
||||
|
||||
void* getFreeBuff(SStreamFileState* pFileState) {
|
||||
SList* lists = pFileState->freeBuffs;
|
||||
int32_t buffSize = pFileState->rowSize;
|
||||
SList* lists = pFileState->freeBuffs;
|
||||
int32_t buffSize = pFileState->rowSize;
|
||||
SListNode* pNode = tdListPopHead(lists);
|
||||
if (!pNode) {
|
||||
return NULL;
|
||||
|
@ -406,7 +400,7 @@ _end:
|
|||
}
|
||||
|
||||
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
|
||||
SRowBuffPos* newPos = getNewRowPos(pFileState);
|
||||
SRowBuffPos* newPos = getNewRowPos(pFileState);
|
||||
newPos->beUsed = true;
|
||||
newPos->beFlushed = false;
|
||||
newPos->needFree = false;
|
||||
|
@ -656,8 +650,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
deleteExpiredCheckPoint(pFileState, mark);
|
||||
}
|
||||
|
||||
SWinKey key = {.groupId = 0, .ts = 0};
|
||||
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);
|
||||
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
|
||||
if (pCur == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -700,22 +693,14 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
|||
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
|
||||
}
|
||||
|
||||
void* getRowStateBuff(SStreamFileState* pFileState) {
|
||||
return pFileState->rowStateBuff;
|
||||
}
|
||||
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
|
||||
|
||||
void* getStateFileStore(SStreamFileState* pFileState) {
|
||||
return pFileState->pFileStore;
|
||||
}
|
||||
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
|
||||
|
||||
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
|
||||
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
|
||||
}
|
||||
|
||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) {
|
||||
return ts <= (pFileState->flushMark + gap);
|
||||
}
|
||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
||||
|
||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
|
||||
return pFileState->rowSize;
|
||||
}
|
||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
||||
|
|
Loading…
Reference in New Issue