|
|
|
@ -28,6 +28,8 @@
|
|
|
|
|
|
|
|
|
|
#define IS_FINAL_OP(op) ((op)->isFinal)
|
|
|
|
|
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
|
|
|
|
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
|
|
|
|
|
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
|
|
|
|
|
|
|
|
|
|
typedef struct SStateWindowInfo {
|
|
|
|
|
SResultWindowInfo winInfo;
|
|
|
|
@ -2721,6 +2723,26 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
|
|
|
|
|
return size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
|
|
|
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
|
|
|
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.releaseStreamStateFn) {
|
|
|
|
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.reloadStreamStateFn) {
|
|
|
|
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
|
|
|
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
|
|
|
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
|
|
|
@ -2830,6 +2852,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|
|
|
|
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
|
|
|
|
|
optrDefaultBufFn, NULL);
|
|
|
|
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
|
|
|
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
|
|
|
|
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
|
|
|
|
}
|
|
|
|
@ -3463,6 +3486,26 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo*
|
|
|
|
|
pBlock->info.id.groupId = 0;
|
|
|
|
|
buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
|
|
|
|
}
|
|
|
|
|
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
|
|
|
|
int32_t size = taosArrayGetSize(pAllWins);
|
|
|
|
|
if (size == 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1);
|
|
|
|
|
taosArrayPush(pMaxWins, pSeKey);
|
|
|
|
|
if (pSeKey->groupId == 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
uint64_t preGpId = pSeKey->groupId;
|
|
|
|
|
for (int32_t i = size - 2; i >= 0; i--) {
|
|
|
|
|
pSeKey = taosArrayGet(pAllWins, i);
|
|
|
|
|
if (preGpId != pSeKey->groupId) {
|
|
|
|
|
taosArrayPush(pMaxWins, pSeKey);
|
|
|
|
|
preGpId = pSeKey->groupId;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SExprSupp* pSup = &pOperator->exprSupp;
|
|
|
|
@ -3563,6 +3606,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
|
|
|
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
|
|
|
|
pInfo->pStUpdated = NULL;
|
|
|
|
|
if(pInfo->isHistoryOp) {
|
|
|
|
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
|
|
|
|
}
|
|
|
|
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
|
|
|
|
pInfo->pUpdated = NULL;
|
|
|
|
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
|
|
|
@ -3589,6 +3635,42 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
|
|
|
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
|
|
|
|
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
|
|
|
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
|
|
|
|
}
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.releaseStreamStateFn) {
|
|
|
|
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
|
|
|
|
SResultWindowInfo winInfo = {0};
|
|
|
|
|
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
|
|
|
|
int32_t size = 0;
|
|
|
|
|
void* pBuf = NULL;
|
|
|
|
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
|
|
|
|
|
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
|
|
|
|
|
int32_t num = size / sizeof(SSessionKey);
|
|
|
|
|
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
|
|
|
|
ASSERT(size == num * sizeof(SSessionKey));
|
|
|
|
|
for (int32_t i = 0; i < num; i++) {
|
|
|
|
|
SResultWindowInfo winInfo = {0};
|
|
|
|
|
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
|
|
|
|
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.reloadStreamStateFn) {
|
|
|
|
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
|
|
|
|
SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
|
|
|
@ -3653,11 +3735,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|
|
|
|
pInfo->pUpdated = NULL;
|
|
|
|
|
pInfo->pStUpdated = NULL;
|
|
|
|
|
pInfo->dataVersion = 0;
|
|
|
|
|
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
|
|
|
|
|
if (!pInfo->historyWins) {
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
pInfo->isHistoryOp = false;
|
|
|
|
|
|
|
|
|
|
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
|
|
|
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
|
|
|
|
optrDefaultBufFn, NULL);
|
|
|
|
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
|
|
|
|
|
|
|
|
|
if (downstream) {
|
|
|
|
|
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
|
|
|
@ -3765,7 +3853,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
|
|
|
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
|
|
|
|
pInfo->pStUpdated = NULL;
|
|
|
|
|
|
|
|
|
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
|
|
|
|
pInfo->pUpdated = NULL;
|
|
|
|
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
|
|
|
@ -3815,7 +3902,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
|
|
|
|
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
|
|
|
|
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
|
|
|
|
|
|
|
|
pOperator->operatorType = pPhyNode->type;
|
|
|
|
@ -3879,6 +3966,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool compareStateKey(void* data, void* key) {
|
|
|
|
|
if (!data || !key) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
SStateKeys* stateKey = (SStateKeys*)key;
|
|
|
|
|
stateKey->pData = (char*)key + sizeof(SStateKeys);
|
|
|
|
|
return compareVal(data, stateKey);
|
|
|
|
@ -3902,7 +3992,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|
|
|
|
|
|
|
|
|
if (code == TSDB_CODE_SUCCESS) {
|
|
|
|
|
pCurWin->winInfo.isOutput = true;
|
|
|
|
|
} else {
|
|
|
|
|
} else if (pKeyData) {
|
|
|
|
|
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
|
|
|
|
|
varDataCopy(pCurWin->pStateKey->pData, pKeyData);
|
|
|
|
|
} else {
|
|
|
|
@ -4100,6 +4190,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
|
|
|
|
pInfo->pSeUpdated = NULL;
|
|
|
|
|
|
|
|
|
|
if(pInfo->isHistoryOp) {
|
|
|
|
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
|
|
|
|
pInfo->pUpdated = NULL;
|
|
|
|
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
|
|
|
@ -4125,6 +4219,68 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamStateReleaseState(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
|
|
|
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.releaseStreamStateFn) {
|
|
|
|
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
|
|
|
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
|
|
|
|
|
SExprSupp* pSup = &pOperator->exprSupp;
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
|
|
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
|
|
|
|
|
|
|
|
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SResultRow* pCurResult = NULL;
|
|
|
|
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
|
|
|
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
|
|
|
|
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
|
|
|
|
SResultRow* pWinResult = NULL;
|
|
|
|
|
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
|
|
|
|
|
|
|
|
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
|
|
|
|
|
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
|
|
|
|
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
|
|
|
|
|
if (pNextWin->isOutput && pStDeleted) {
|
|
|
|
|
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
|
|
|
|
}
|
|
|
|
|
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
|
|
|
|
|
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
|
|
|
|
|
taosMemoryFree(pNextWin->pOutputBuf);
|
|
|
|
|
saveSessionOutputBuf(pAggSup, pCurWin);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamStateReloadState(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
|
|
|
|
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
|
|
|
|
int32_t size = 0;
|
|
|
|
|
void* pBuf = NULL;
|
|
|
|
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
|
|
|
|
|
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
|
|
|
|
|
int32_t num = size / sizeof(SSessionKey);
|
|
|
|
|
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
|
|
|
|
ASSERT(size == num * sizeof(SSessionKey));
|
|
|
|
|
for (int32_t i = 0; i < num; i++) {
|
|
|
|
|
SStateWindowInfo curInfo = {0};
|
|
|
|
|
SStateWindowInfo nextInfo = {0};
|
|
|
|
|
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
|
|
|
|
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
|
|
|
|
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
if (downstream->fpSet.reloadStreamStateFn) {
|
|
|
|
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
|
|
|
|
SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
|
|
|
|
@ -4186,11 +4342,17 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|
|
|
|
pInfo->pUpdated = NULL;
|
|
|
|
|
pInfo->pSeUpdated = NULL;
|
|
|
|
|
pInfo->dataVersion = 0;
|
|
|
|
|
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
|
|
|
|
|
if (!pInfo->historyWins) {
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
pInfo->isHistoryOp = false;
|
|
|
|
|
|
|
|
|
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
|
|
|
|
pInfo, pTaskInfo);
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
|
|
|
|
optrDefaultBufFn, NULL);
|
|
|
|
|
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamSessionReloadState);
|
|
|
|
|
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
|
|
|
|
code = appendDownstream(pOperator, &downstream, 1);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
@ -5017,6 +5179,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|
|
|
|
pInfo, pTaskInfo);
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
|
|
|
|
|
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
|
|
|
|
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
|
|
|
|
|
|
|
|
|
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
|
|
|
|
|
pInfo->recvGetAll = false;
|
|
|
|
|