fix issue

This commit is contained in:
liuyao 2023-10-07 17:43:50 +08:00
parent 460ee86fdf
commit 1aaf873134
4 changed files with 122 additions and 37 deletions

View File

@ -32,9 +32,10 @@ typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen, bool invalid);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num);
typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num);
typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
@ -75,7 +76,8 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid);
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen);
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff);

View File

@ -2642,6 +2642,20 @@ void resetWinRange(STimeWindow* winRange) {
winRange->ekey = INT64_MAX;
}
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t rowSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
if (code == TSDB_CODE_SUCCESS) {
pWinInfo->sessionWin = *pKey;
pWinInfo->isOutput = true;
if (pWinInfo->pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pWinInfo->sessionWin);
}
} else {
SET_SESSION_WIN_INVALID((*pWinInfo));
}
}
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -2657,7 +2671,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
compactSessionSemiWindow(pOperator, &winInfo);
saveSessionOutputBuf(pAggSup, &winInfo);
}
@ -2672,17 +2686,6 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
}
}
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t rowSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
if (code == TSDB_CODE_SUCCESS) {
pWinInfo->sessionWin = *pKey;
pWinInfo->isOutput = true;
} else {
SET_SESSION_WIN_INVALID((*pWinInfo));
}
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -3061,6 +3064,50 @@ bool compareWinStateKey(SStateKeys* left, SStateKeys* right) {
return compareVal(left->pData, right);
}
void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin,
SStateWindowInfo* pNextWin) {
int32_t size = pAggSup->resultRowSize;
pCurWin->winInfo.sessionWin.groupId = pKey->groupId;
pCurWin->winInfo.sessionWin.win.skey = pKey->win.skey;
pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey;
getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo);
ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo));
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false;
pCurWin->winInfo.isOutput = true;
if (pCurWin->winInfo.pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
}
qDebug("===stream===get state cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
pCurWin->winInfo.sessionWin.win.ekey);
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
int32_t nextSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin,
(void**)&pNextWin->winInfo.pStatePos, &nextSize);
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
} else {
pNextWin->pStateKey =
(SStateKeys*)((char*)pNextWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pNextWin->pStateKey->type = pAggSup->stateKeyType;
pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys);
pNextWin->pStateKey->isNull = false;
pNextWin->winInfo.isOutput = true;
}
pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey,
pNextWin->winInfo.sessionWin.win.ekey);
}
void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
int32_t size = pAggSup->resultRowSize;
@ -3535,7 +3582,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
SStateWindowInfo dummy = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i);
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo);
bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey);
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d",
nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);

View File

@ -228,7 +228,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid) {
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
@ -242,9 +242,28 @@ int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, b
if (index >= 0) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
if (invalid) {
pPos->beFlushed = true;
taosArrayRemove(pWinStates, index);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pWinKey = (SSessionKey*) pPos->pKey;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (index >= 0) {
SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
if (pItemPos == pPos) {
pItemPos->beFlushed = true;
taosArrayRemove(pWinStates, index);
}
}

View File

@ -48,7 +48,8 @@ struct SStreamFileState {
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
_state_buff_create_statekeyfn stateBuffCreateStateKeyFn;
_state_buff_remove_by_pos_fn stateBuffRemoveByPosFn;
_state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
@ -57,16 +58,25 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen, bool invalid) {
if (invalid) {
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
if (pos) {
(*pos)->beFlushed = true;
}
}
return tSimpleHashRemove(pBuff, pKey, keyLen);
}
int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
size_t keyLen = pFileState->rowSize;
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
if (ppPos) {
if ((*ppPos) == pPos) {
return tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
}
}
return TSDB_CODE_SUCCESS;
}
void stateHashBuffClearFn(void* pBuff) {
tSimpleHashClear(pBuff);
}
@ -131,6 +141,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
pFileState->stateFileRemoveFn = intervalFileRemoveFn;
@ -140,7 +151,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
} else {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
pFileState->stateFileRemoveFn = sessionFileRemoveFn;
@ -236,7 +248,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
putFreeBuff(pFileState, pPos);
if (!all) {
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
}
destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
@ -256,7 +268,7 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) {
@ -290,7 +302,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
if (pPos->beUsed == used) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) {
@ -433,7 +445,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
}
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen, true);
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
@ -460,9 +472,14 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
pPos->pRowBuff = getFreeBuff(pFileState);
if (!pPos->pRowBuff) {
if (pFileState->curRowCount < pFileState->maxRowCount) {
pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
pFileState->curRowCount++;
} else {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState);
}
ASSERT(pPos->pRowBuff);
}