Merge pull request #27888 from taosdata/fix/ly_stream

fix(stream):adj build file state res
This commit is contained in:
Haojun Liao 2024-09-14 13:50:51 +08:00 committed by GitHub
commit b78cbb7dc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 47 deletions

View File

@ -408,9 +408,9 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark,
const char* id, int64_t ckId, int8_t type);
int32_t (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* id, int64_t ckId, int8_t type,
struct SStreamFileState** ppFileState);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);

View File

@ -45,9 +45,9 @@ typedef int32_t (*_state_fun_get_fn)(SStreamFileState* pFileState, void* pKey, i
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type);
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
struct SStreamFileState** ppFileState);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
@ -63,7 +63,7 @@ int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
@ -89,7 +89,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen);
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen);
void recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff);

View File

@ -1991,10 +1991,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pInfo->pUpdatedMap = NULL;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
pInfo->pState->pFileState = NULL;
code =
pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo),
pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->dataVersion = 0;
pInfo->recvGetAll = false;
@ -2176,39 +2178,33 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
int32_t lino = 0;
int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
pSup->gap = gap;
pSup->stateKeySize = keySize;
pSup->stateKeyType = keyType;
pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pSup->pDummyCtx == NULL) {
return terrno;
}
QUERY_CHECK_NULL(pSup->pDummyCtx, code, lino, _end, terrno);
pSup->stateStore = *pStore;
pSup->pSessionAPI = pApi;
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (!pSup->pState) {
return terrno;
}
QUERY_CHECK_NULL(pSup->pState, code, lino, _end, terrno);
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
pSup->pState->pFileState = NULL;
code = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT);
QUERY_CHECK_NULL(pSup->pState->pFileState, code, lino, _end, terrno);
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _end);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pSup->pResultRows = tSimpleHashInit(32, hashFn);
if (!pSup->pResultRows) {
return terrno;
}
QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno);
for (int32_t i = 0; i < numOfOutput; ++i) {
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
@ -5348,10 +5344,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
int32_t funResSize = getMaxFunResSize(pSup, numOfCols);
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
pInfo->pState->pFileState = NULL;
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
QUERY_CHECK_NULL(pInfo->pState->pFileState, code, lino, _error, terrno);
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _error);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);

View File

@ -345,7 +345,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark && maxTs != INT64_MIN) {
if (maxTs != INT64_MIN && ts < maxTs - pInfo->watermark) {
// this window has been closed.
if (pInfo->pCloseWinSBF) {
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
@ -585,6 +585,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t sBfSize = 0;
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*));
QUERY_CHECK_NULL(pInfo->pTsSBFs, code, lino, _error, terrno);
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = NULL;
code = tScalableBfDecode(&decoder, &pSBf);

View File

@ -127,9 +127,9 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) {
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
SStreamFileState** ppFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (memSize <= 0) {
@ -194,10 +194,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
// todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) {
recoverSnapshot(pFileState, checkpointId);
code = recoverSnapshot(pFileState, checkpointId);
} else {
recoverSesssion(pFileState, checkpointId);
code = recoverSesssion(pFileState, checkpointId);
}
QUERY_CHECK_CODE(code, lino, _error);
void* valBuf = NULL;
int32_t len = 0;
@ -208,14 +209,14 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
}
taosMemoryFreeClear(valBuf);
return pFileState;
(*ppFileState) = pFileState;
_error:
if (code != TSDB_CODE_SUCCESS) {
streamFileStateDestroy(pFileState);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
streamFileStateDestroy(pFileState);
return NULL;
return code;
}
void destroyRowBuffPos(SRowBuffPos* pPos) {
@ -806,8 +807,10 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code;
}
void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winRes = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
@ -818,7 +821,7 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) {
while (winRes == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount >= recoverNum) {
break;
}
@ -826,22 +829,34 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
void* pVal = NULL;
int32_t vlen = 0;
SSessionKey key = {0};
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
if (code != TSDB_CODE_SUCCESS) {
winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
if (winRes != TSDB_CODE_SUCCESS) {
break;
}
if (vlen != pFileState->rowSize) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
code = putSessionWinResultBuff(pFileState, pPos);
if (code != TSDB_CODE_SUCCESS) {
winRes = putSessionWinResultBuff(pFileState, pPos);
if (winRes != TSDB_CODE_SUCCESS) {
break;
}
code = streamStateSessionCurPrev_rocksdb(pCur);
winRes = streamStateSessionCurPrev_rocksdb(pCur);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
streamStateFreeCur(pCur);
return code;
}
void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
@ -896,6 +911,7 @@ _end:
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
streamStateFreeCur(pCur);
return code;
}
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }