adj stream operator result

This commit is contained in:
54liuyao 2024-07-18 10:37:14 +08:00
parent ab642b245c
commit 4e6b898c9a
9 changed files with 90 additions and 34 deletions

View File

@ -367,7 +367,7 @@ typedef struct SStateStore {
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
void (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
void (*streamStateSessionReset)(SStreamState* pState, void* pVal);
void (*streamStateSessionClear)(SStreamState* pState);

View File

@ -54,7 +54,7 @@ int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, voi
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
void streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
void streamStateSessionReset(SStreamState* pState, void* pVal);
void streamStateSessionClear(SStreamState* pState);

View File

@ -59,7 +59,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
@ -81,7 +81,8 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen);
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,

View File

@ -955,7 +955,7 @@ void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap,
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite);
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
SSDataBlock* pBlock);
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
SResultWindowInfo* pNextWin);
int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,

View File

@ -744,7 +744,9 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
SEventWindowInfo curInfo = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i);
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
// event window has been deleted
if (!IS_VALID_SESSION_WIN(curInfo.winInfo)) {
continue;

View File

@ -3389,11 +3389,16 @@ void resetWinRange(STimeWindow* winRange) {
winRange->ekey = INT64_MAX;
}
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t rowSize = pAggSup->resultRowSize;
int32_t code =
pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize);
if (code == TSDB_CODE_SUCCESS) {
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAggSup->stateStore.streamStateSessionGet(pAggSup->pState, pKey, (void**)&pWinInfo->pStatePos, &rowSize,
&winCode);
TSDB_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) {
pWinInfo->sessionWin = *pKey;
pWinInfo->isOutput = true;
if (pWinInfo->pStatePos->needFree) {
@ -3402,6 +3407,12 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey,
} else {
SET_SESSION_WIN_INVALID((*pWinInfo));
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) {
@ -3435,7 +3446,8 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
TSDB_CHECK_CODE(code, lino, _end);
if (!IS_VALID_SESSION_WIN(winInfo)) {
continue;
}
@ -3488,7 +3500,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
}
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
TSDB_CHECK_CODE(code, lino, _end);
if (!IS_VALID_SESSION_WIN(winInfo)) {
continue;
}
@ -3946,14 +3959,19 @@ bool compareWinStateKey(SStateKeys* left, SStateKeys* right) {
return compareVal(left->pData, right);
}
void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin,
SStateWindowInfo* pNextWin) {
int32_t size = pAggSup->resultRowSize;
int32_t getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SStateWindowInfo* pCurWin,
SStateWindowInfo* pNextWin) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamStateCur* pCur = NULL;
int32_t size = pAggSup->resultRowSize;
pCurWin->winInfo.sessionWin.groupId = pKey->groupId;
pCurWin->winInfo.sessionWin.win.skey = pKey->win.skey;
pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey;
getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo);
code = getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo);
TSDB_CHECK_CODE(code, lino, _end);
ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo));
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
@ -3969,12 +3987,11 @@ void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SS
pCurWin->winInfo.sessionWin.win.ekey);
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
int32_t nextSize = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin,
(void**)&pNextWin->winInfo.pStatePos, &nextSize);
if (code != TSDB_CODE_SUCCESS) {
int32_t winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin,
(void**)&pNextWin->winInfo.pStatePos, &nextSize);
if (winCode != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
} else {
pNextWin->pStateKey =
@ -3985,9 +4002,15 @@ void getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SS
pNextWin->pStateKey->isNull = false;
pNextWin->winInfo.isOutput = true;
}
_end:
pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===get state next win buff. skey:%" PRId64 ", endkey:%" PRId64,
pNextWin->winInfo.sessionWin.win.skey, pNextWin->winInfo.sessionWin.win.ekey);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,

View File

@ -337,7 +337,7 @@ _end:
return code;
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
@ -348,9 +348,9 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
pNewPos->needFree = true;
pNewPos->beFlushed = true;
void* pBuff = NULL;
int32_t winCode = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if (winCode != TSDB_CODE_SUCCESS) {
return winCode;
(*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if ((*pWinCode) != TSDB_CODE_SUCCESS) {
goto _end;
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);

View File

@ -353,22 +353,33 @@ void streamStateFreeVal(void* val) { taosMemoryFree(val); }
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) {
if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
if (!pos->pRowBuff) {
return code;
goto _end;
}
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
TSDB_CHECK_CODE(code, lino, _end);
streamStateReleaseBuf(pState, pos, true);
putFreeBuff(pState->pFileState, pos);
code = putFreeBuff(pState->pFileState, pos);
TSDB_CHECK_CODE(code, lino, _end);
stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
} else {
pos->beFlushed = false;
code = putSessionWinResultBuff(pState->pFileState, value);
TSDB_CHECK_CODE(code, lino, _end);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
@ -377,8 +388,8 @@ int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStre
return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
}
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
}
void streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {

View File

@ -124,7 +124,8 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *pKey);
int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
ASSERT(tmp == sizeof(TSKEY));
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
@ -255,14 +256,25 @@ void streamFileStateDestroy(SStreamFileState* pFileState) {
taosMemoryFree(pFileState);
}
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pPos->pRowBuff) {
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
TSDB_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = NULL;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -270,7 +282,8 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
putFreeBuff(pFileState, pPos);
code = putFreeBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
if (!all) {
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
@ -280,6 +293,11 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
taosMemoryFreeClear(tmp);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) {
@ -392,7 +410,8 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&fIter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
putFreeBuff(pFileState, pPos);
code = putFreeBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
}
tdListFreeP(pFlushList, destroyRowBuffPosPtr);