refactor backend

This commit is contained in:
yihaoDeng 2023-10-13 16:07:34 +08:00
parent 8392c99d36
commit b6005d6183
1 changed files with 23 additions and 19 deletions

View File

@ -1081,10 +1081,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
int32_t mapSize = 0; int32_t mapSize = 0;
buf = taosDecodeFixedI32(buf, &mapSize); buf = taosDecodeFixedI32(buf, &mapSize);
for (int32_t i = 0; i < mapSize; i++) { for (int32_t i = 0; i < mapSize; i++) {
SWinKey key = {0}; SWinKey key = {0};
buf = decodeSWinKey(buf, &key); buf = decodeSWinKey(buf, &key);
SRowBuffPos* pPos = NULL; SRowBuffPos* pPos = NULL;
int32_t resSize = pInfo->aggSup.resultRowSize; int32_t resSize = pInfo->aggSup.resultRowSize;
pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize); pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize);
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
} }
@ -1388,10 +1388,12 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
void* pBuf = NULL; void* pBuf = NULL;
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
TSKEY ts = *(TSKEY*)pBuf; if (code == 0) {
taosMemoryFree(pBuf); TSKEY ts = *(TSKEY*)pBuf;
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); taosMemoryFree(pBuf);
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts);
}
} }
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
@ -2527,16 +2529,16 @@ void resetWinRange(STimeWindow* winRange) {
void streamSessionSemiReloadState(SOperatorInfo* pOperator) { void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange); resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
int32_t size = 0; int32_t size = 0;
void* pBuf = NULL; void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey); int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey)); ASSERT(size == num * sizeof(SSessionKey));
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
@ -2671,7 +2673,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
// for stream // for stream
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
int32_t res = int32_t res =
@ -2792,7 +2794,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL; pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) { if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
} }
@ -2826,8 +2828,9 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, pOperator->fpSet =
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
} }
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo,
@ -3738,8 +3741,9 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, pOperator->fpSet =
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;