fix/TD-32505-main

This commit is contained in:
yihaoDeng 2024-10-11 09:17:43 +08:00
parent 6b25c4756b
commit 6c9a91b26b
1 changed files with 45 additions and 40 deletions

View File

@ -49,7 +49,7 @@
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
#define MAX_STREAM_HISTORY_RESULT 100000000
#define MAX_STREAM_HISTORY_RESULT 100000000
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
@ -449,7 +449,7 @@ void destroyFlusedPos(void* pRes) {
}
void destroyFlusedppPos(void* ppRes) {
void *pRes = *(void **)ppRes;
void* pRes = *(void**)ppRes;
destroyFlusedPos(pRes);
}
@ -507,7 +507,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
}
taosArrayDestroy(pInfo->pMidPullDatas);
if (pInfo->pState !=NULL && pInfo->pState->dump == 1) {
if (pInfo->pState != NULL && pInfo->pState->dump == 1) {
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
taosMemoryFreeClear(pInfo->pState->pTdbState);
}
@ -548,7 +548,8 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) {
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo,
struct SSteamOpBasicInfo* pBasic) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
@ -1028,7 +1029,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
}
static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
@ -1661,7 +1662,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
QUERY_CHECK_NULL(delWins, code, lino, _end, terrno);
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
@ -1897,9 +1898,8 @@ _end:
}
}
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
@ -1959,8 +1959,8 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
@ -2042,11 +2042,13 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pOperator->info = pInfo;
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
pOperator->fpSet = createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamMidIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
} else {
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAggNext, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
}
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
@ -2220,10 +2222,10 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = NULL;
code = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
pSup->pState->pFileState = NULL;
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize,
sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _end);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -3309,8 +3311,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
code =
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
@ -3772,8 +3774,8 @@ _end:
}
}
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
@ -3802,12 +3804,12 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
}
}
SExprSupp* pExpSup = &pOperator->exprSupp;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
pInfo->binfo.pRes = pResBlock;
SExprInfo* pExprInfo = NULL;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
@ -3886,8 +3888,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
QUERY_CHECK_CODE(code, lino, _error);
}
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamSessionAggNext, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
if (downstream) {
@ -4102,8 +4105,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
@ -4111,7 +4114,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* pOperator = NULL;
code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator);
if (pOperator == NULL || code != 0) {
downstream = NULL;
downstream = NULL;
QUERY_CHECK_CODE(code, lino, _error);
}
@ -4618,8 +4621,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
QUERY_CHECK_CODE(code, lino, _end);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
code =
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
@ -4846,7 +4849,7 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
int32_t resSize = winSize + sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize);
if (!pBuff) {
return ;
return;
}
memcpy(pBuff, pInfo->historyWins->pData, winSize);
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
@ -4944,7 +4947,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
}
}
taosMemoryFree(pBuf);
taosMemoryFreeClear(pBuf);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
@ -4953,6 +4956,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
_end:
taosMemoryFreeClear(pBuf);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
@ -5001,9 +5005,9 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
pInfo->binfo.pRes = pResBlock;
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
@ -5335,7 +5339,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
@ -5383,7 +5387,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pState->pFileState = NULL;
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState);
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH,
&pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pOperator = pOperator;
@ -5397,7 +5402,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->recvGetAll = false;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);