fix issue

This commit is contained in:
liuyao 2023-09-22 09:40:08 +08:00
parent b1d9cd1c4a
commit 4979fc5283
5 changed files with 113 additions and 51 deletions

View File

@ -34,6 +34,7 @@ typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t k
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num);
typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
@ -72,6 +73,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
// session window
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen);
void sessionWinStateClear(SStreamFileState* pFileState);

View File

@ -2595,11 +2595,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
void streamSessionReleaseState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
int32_t resSize = winSize + sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize);
memcpy(pBuff, pInfo->historyWins->pData, winSize);
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
strlen(STREAM_SESSION_OP_STATE_NAME), pBuff, resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
taosMemoryFreeClear(pBuff);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
@ -2621,16 +2625,19 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionSemiWindow(pOperator, &winInfo);
saveSessionOutputBuf(pAggSup, &winInfo);
}
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
taosMemoryFree(pBuf);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
@ -2638,26 +2645,41 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
}
}
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t rowSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
if (code == TSDB_CODE_SUCCESS) {
pWinInfo->sessionWin = *pKey;
pWinInfo->isOutput = true;
} else {
SET_SESSION_WIN_INVALID((*pWinInfo));
}
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey));
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
if (!pInfo->pStUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey,

View File

@ -135,16 +135,15 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key,
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
int32_t len = 0;
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, &len);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, pVLen);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
memcpy(pNewPos->pRowBuff, p, *pVLen);
}
taosMemoryFree(p);
(*pVal) = pNewPos;
@ -162,6 +161,20 @@ _end:
return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
void* pBuff = NULL;
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
taosMemoryFreeClear(pBuff);
(*pVal) = pNewPos;
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key;
@ -446,16 +459,15 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
int32_t len = 0;
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, &len);
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
memcpy(pNewPos->pRowBuff, p, *pVLen);
}
taosMemoryFree(p);
(*pVal) = pNewPos;

View File

@ -720,8 +720,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
ASSERT(0);
return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen);
#else
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);

View File

@ -28,29 +28,31 @@
#define MIN_NUM_OF_ROW_BUFF 10240
struct SStreamFileState {
SList* usedBuffs;
SList* freeBuffs;
void* rowStateBuff;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
SList* usedBuffs;
SList* freeBuffs;
void* rowStateBuff;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
char* cfName;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
_state_buff_create_statekeyfn stateBuffCreateStateKeyFn;
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
_state_file_clear_fn stateFileClearFn;
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
_state_file_clear_fn stateFileClearFn;
};
typedef SRowBuffPos SRowBuffInfo;
@ -75,6 +77,14 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data,
return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
SWinKey* pWinKey = pPos->pKey;
pStateKey->key = *pWinKey;
pStateKey->opNum = num;
return pStateKey;
}
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
}
@ -83,6 +93,14 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i
return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
SSessionKey* pWinKey = pPos->pKey;
pStateKey->key = *pWinKey;
pStateKey->opNum = num;
return pStateKey;
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId, int8_t type) {
@ -107,18 +125,22 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
pFileState->stateFileRemoveFn = intervalFileRemoveFn;
pFileState->stateFileGetFn = intervalFileGetFn;
pFileState->stateFileClearFn = streamStateClear_rocksdb;
pFileState->cfName = taosStrdup("state");
} else {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff;
pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
pFileState->stateFileRemoveFn = sessionFileRemoveFn;
pFileState->stateFileGetFn = sessionFileGetFn;
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
pFileState->cfName = taosStrdup("sess");
}
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
@ -183,6 +205,7 @@ void streamFileStateDestroy(SStreamFileState* pFileState) {
}
taosMemoryFree(pFileState->id);
taosMemoryFree(pFileState->cfName);
tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
@ -226,9 +249,6 @@ void streamFileStateClear(SStreamFileState* pFileState) {
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
if (pPos->needFree) {
putFreeBuff(pFileState, pPos);
}
pPos->beUsed = used;
}
@ -391,8 +411,19 @@ int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t ke
return TSDB_CODE_FAILED;
}
static void recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t len = 0;
void* pBuff = NULL;
pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
}
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
if (pPos->pRowBuff) {
if (pPos->needFree) {
recoverSessionRowBuff(pFileState, pPos);
}
(*pVal) = pPos->pRowBuff;
return TSDB_CODE_SUCCESS;
}
@ -405,11 +436,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
ASSERT(pPos->pRowBuff);
}
int32_t len = 0;
void* pBuff = NULL;
pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
recoverSessionRowBuff(pFileState, pPos);
(*pVal) = pPos->pRowBuff;
tdListPrepend(pFileState->usedBuffs, &pPos);
return TSDB_CODE_SUCCESS;
@ -452,7 +479,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t numOfElems = listNEles(pSnapshot);
SListNode* pNode = NULL;
int idx = streamStateGetCfIdx(pFileState->pFileStore, "state");
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
char* buf = taosMemoryCalloc(1, len);
@ -471,12 +498,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateClearBatch(batch);
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, &sKey, pPos->pRowBuff, pFileState->rowSize,
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
0, buf);
taosMemoryFreeClear(pSKey);
// todo handle failure
memset(buf, 0, len);
// qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
}
taosMemoryFree(buf);
@ -637,7 +664,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) {
return ts < pFileState->flushMark;
return ts <= pFileState->flushMark;
}
int32_t getRowStateRowSize(SStreamFileState* pFileState) {