Merge pull request #23201 from taosdata/enh/TD-26646

Enh/td 26646
This commit is contained in:
Haojun Liao 2023-10-10 22:59:37 +08:00 committed by GitHub
commit 1d4f0c8599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 103 additions and 48 deletions

View File

@ -65,11 +65,11 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
// func cf
@ -84,10 +84,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState);
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,

View File

@ -1970,7 +1970,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
memset(*pVal, 0, size);
return 0;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
@ -2052,7 +2052,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL;
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
qDebug("streamStateGetCur_rocksdb");
int32_t code = 0;
@ -2062,9 +2062,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
return NULL;
}
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
{
char tbuf[256] = {0};
stateKeyToString((void*)&maxStateKey, tbuf);
@ -2079,6 +2076,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
@ -2183,6 +2182,52 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionSeekToLast_rocksdb");
int32_t code = 0;
SSessionKey maxSessionKey = {.groupId = UINT64_MAX, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
if (code != 0) {
return NULL;
}
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor();
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
pCur = NULL;
}
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
return pCur;
}
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
@ -2301,6 +2346,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
}
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGetKVByCur_rocksdb");
if (!pCur) {

View File

@ -670,7 +670,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
return streamStateCurPrev_rocksdb(pState, pCur);
return streamStateCurPrev_rocksdb(pCur);
#else
if (!pCur) {
return -1;

View File

@ -77,13 +77,9 @@ int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pP
return TSDB_CODE_SUCCESS;
}
void stateHashBuffClearFn(void* pBuff) {
tSimpleHashClear(pBuff);
}
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
void stateHashBuffCleanupFn(void* pBuff) {
tSimpleHashCleanup(pBuff);
}
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
@ -118,8 +114,8 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId, int8_t type) {
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
@ -290,9 +286,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
pPos->beUsed = used;
}
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
@ -647,6 +641,26 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code;
}
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int code = TSDB_CODE_SUCCESS;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
while (code == TSDB_CODE_SUCCESS) {
void* pVal = NULL;
int32_t vlen = 0;
SSessionKey key = {0};
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
if (code != 0) {
break;
}
taosMemoryFree(pVal);
code = streamStateSessionCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
return code;
}
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
@ -656,8 +670,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
deleteExpiredCheckPoint(pFileState, mark);
}
SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
@ -667,9 +680,9 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
break;
}
void* pVal = NULL;
int32_t pVLen = 0;
int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
@ -677,8 +690,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pVal);
break;
}
ASSERT(pVLen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, pVLen);
ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
@ -686,7 +699,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
destroyRowBuffPos(pNewPos);
break;
}
code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur);
code = streamStateCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
@ -700,22 +713,14 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
}
void* getRowStateBuff(SStreamFileState* pFileState) {
return pFileState->rowStateBuff;
}
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
void* getStateFileStore(SStreamFileState* pFileState) {
return pFileState->pFileStore;
}
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) {
return ts <= (pFileState->flushMark + gap);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
return pFileState->rowSize;
}
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }