fix issue

This commit is contained in:
54liuyao 2024-10-17 14:55:07 +08:00
parent baf47844f1
commit a62b5a3efd
12 changed files with 106 additions and 36 deletions

View File

@ -378,6 +378,7 @@ typedef struct SStateStore {
int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
void (*streamStateSetFillInfo)(SStreamState* pState);
void (*streamStateClearExpiredState)(SStreamState* pState);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,

View File

@ -108,6 +108,7 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// twa
void streamStateSetFillInfo(SStreamState* pState);
void streamStateClearExpiredState(SStreamState* pState);
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -145,9 +145,12 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState);
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos);
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey);
//twa
void setFillInfo(SStreamFileState* pFileState);
void clearExpiredState(SStreamFileState* pFileState);
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates);
#ifdef __cplusplus
}

View File

@ -67,6 +67,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;

View File

@ -183,6 +183,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;

View File

@ -1636,6 +1636,7 @@ int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInter
*pInterval = pInfo->interval;
pInfo->hasFill = true;
(*ppAggSup) = &pInfo->streamAggSup;
pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState);
} else {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
}

View File

@ -1751,6 +1751,9 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
goto _end;
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
resetStreamFillSup(pInfo->pFillSup);
(*ppRes) = NULL;
@ -1854,6 +1857,9 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
if (!(*ppRes)) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
resetStreamFillSup(pInfo->pFillSup);
}

View File

@ -237,6 +237,7 @@ typedef struct SElapsedInfo {
} SElapsedInfo;
typedef struct STwaInfo {
double dTwaRes;
double dOutput;
int64_t numOfElems;
SPoint1 p;

View File

@ -6201,11 +6201,11 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
pResInfo->numOfRes = 0;
} else {
if (pInfo->win.ekey == pInfo->win.skey) {
pInfo->dOutput = pInfo->p.val;
pInfo->dTwaRes = pInfo->p.val;
} else if (pInfo->win.ekey == INT64_MAX || pInfo->win.skey == INT64_MIN) { // no data in timewindow
pInfo->dOutput = 0;
pInfo->dTwaRes = 0;
} else {
pInfo->dOutput = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
pInfo->dTwaRes = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
}
pResInfo->numOfRes = 1;

View File

@ -33,16 +33,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, sizeof(SWinKey));
QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
QUERY_CHECK_CODE(code, lino, _end);
}
addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates);
// recover
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
@ -64,25 +55,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
streamStateFreeCur(pCur);
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0 || size == 0) {
// find the first position which is smaller than the pKey
if (index >= 0) {
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
goto _end;
}
}
index++;
void* tmp = taosArrayInsert(pWinStates, index, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
if (size >= MAX_NUM_OF_CACHE_WIN) {
int32_t num = size - NUM_OF_CACHE_WIN;
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
addSearchItem(pFileState, pWinStates, pKey);
_end:
if (code != TSDB_CODE_SUCCESS) {

View File

@ -229,7 +229,6 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void*
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
if (pState->pFileState) {
// todo(liuyao) 改这里
return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
}
return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
@ -289,7 +288,24 @@ int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, voi
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return streamStateGet(pState, key, pVal, pVLen, pWinCode);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
code = streamStateGet(pState, key, pVal, pVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end);
SSHashObj* pSearchBuff = getSearchBuff(pState->pFileState);
if (pSearchBuff != NULL) {
SArray* pWinStates = NULL;
code = addArrayBuffIfNotExist(pSearchBuff, key->groupId, &pWinStates);
QUERY_CHECK_CODE(code, lino, _end);
code = addSearchItem(pState->pFileState, pWinStates, key);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
@ -588,6 +604,10 @@ void streamStateClearExpiredState(SStreamState* pState) {
clearExpiredState(pState->pFileState);
}
void streamStateSetFillInfo(SStreamState* pState) {
setFillInfo(pState->pFileState);
}
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);

View File

@ -56,6 +56,7 @@ struct SStreamFileState {
char* cfName;
void* searchBuff;
SSHashObj* pGroupIdMap;
bool hasFillCatch;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
@ -253,6 +254,8 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
pFileState->hasFillCatch = true;
if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
code = recoverSnapshot(pFileState, checkpointId);
} else if (type == STREAM_STATE_BUFF_SORT) {
@ -997,6 +1000,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
}
if (vlen != pFileState->rowSize) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
taosMemoryFreeClear(pVal);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, pVal, vlen);
@ -1207,6 +1211,10 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}
void setFillInfo(SStreamFileState* pFileState) {
pFileState->hasFillCatch = false;
}
void clearExpiredState(SStreamFileState* pFileState) {
SSHashObj* pSearchBuff = pFileState->searchBuff;
void* pIte = NULL;
@ -1222,8 +1230,10 @@ void clearExpiredState(SStreamFileState* pFileState) {
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
qTrace("%s at line %d res %d", __func__, __LINE__, code_file);
if (pFileState->hasFillCatch == false || isFlushedState(pFileState, pKey->ts, 0)) {
code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
qTrace("state fill delete.%s at line %d res %d", __func__, __LINE__, code_file);
}
}
taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
}
@ -1368,3 +1378,55 @@ _end:
}
return code;
}
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
if (index >= 0) {
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
goto _end;
}
}
index++;
void* tmp = taosArrayInsert(pWinStates, index, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
if (size >= MAX_NUM_OF_CACHE_WIN) {
int32_t num = size - NUM_OF_CACHE_WIN;
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, sizeof(SWinKey));
QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
QUERY_CHECK_CODE(code, lino, _end);
}
(*ppResStates) = pWinStates;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}