adj function of stream operator result
This commit is contained in:
parent
4efd774ce8
commit
3cdb957f22
|
@ -394,7 +394,7 @@ typedef struct SStateStore {
|
|||
int32_t (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
|
||||
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
|
||||
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
|
||||
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
|
||||
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
|
||||
int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
|
||||
|
||||
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
|
||||
|
|
|
@ -33,7 +33,7 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid);
|
|||
void updateInfoDestroy(SUpdateInfo* pInfo);
|
||||
void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo);
|
||||
void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo);
|
||||
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
|
||||
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
|
||||
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
|
||||
void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count);
|
||||
int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count);
|
||||
|
|
|
@ -929,7 +929,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, i
|
|||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||
SStorageAPI* pApi, int32_t tsIndex);
|
||||
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic);
|
||||
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
|
|
|
@ -20,6 +20,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "executorInt.h"
|
||||
#include "tutil.h"
|
||||
|
||||
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
|
||||
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "tname.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "tmsg.h"
|
||||
|
@ -1357,11 +1358,13 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) {
|
||||
int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
|
@ -1369,8 +1372,9 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
|||
pScanInfo->pPartScalarSup = pExpr;
|
||||
pScanInfo->pPartTbnameSup = pTbnameExpr;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
|
||||
code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
||||
|
@ -1421,6 +1425,7 @@ void freePartItem(void* ptr) {
|
|||
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -1511,14 +1516,19 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
|||
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
||||
|
||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
|
||||
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = code;
|
||||
destroyStreamPartitionOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -2477,27 +2477,54 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
|
||||
int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
|
||||
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* pLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
int32_t len = 0;
|
||||
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
|
||||
len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup);
|
||||
*pBuff = taosMemoryCalloc(1, len);
|
||||
if (!(*pBuff)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
void* buf = *pBuff;
|
||||
encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup);
|
||||
pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo);
|
||||
return len;
|
||||
int32_t tmp = 0;
|
||||
code = pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo, &tmp);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
|
||||
*pLen = len;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (!pInfo->pState) {
|
||||
return;
|
||||
}
|
||||
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||
void* pBuf = NULL;
|
||||
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
|
||||
int32_t len = 0;
|
||||
code = streamScanOperatorEncode(pInfo, &pBuf, &len);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
|
||||
taosMemoryFree(pBuf);
|
||||
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
// other properties are recovered from the execution plan
|
||||
|
@ -3094,17 +3121,35 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
void streamScanReleaseState(SOperatorInfo* pOperator) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
void* pBuff = NULL;
|
||||
if (!pInfo->pState) {
|
||||
return;
|
||||
}
|
||||
if (!pInfo->pUpdateInfo) {
|
||||
return;
|
||||
}
|
||||
int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
|
||||
void* pBuff = taosMemoryCalloc(1, len);
|
||||
pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
|
||||
int32_t len = 0;
|
||||
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pBuff = taosMemoryCalloc(1, len);
|
||||
if (!pBuff) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int32_t tmp = 0;
|
||||
code = pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo, &tmp);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pBuff);
|
||||
}
|
||||
|
||||
|
|
|
@ -68,9 +68,7 @@ void destroyStreamCountAggOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) {
|
||||
return pAggSup->windowCount != pAggSup->windowSliding;
|
||||
}
|
||||
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
|
||||
|
||||
void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
|
||||
SBuffInfo* pBuffInfo) {
|
||||
|
@ -95,7 +93,8 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
}
|
||||
} else {
|
||||
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount);
|
||||
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
pAggSup->windowCount);
|
||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
if (code == TSDB_CODE_FAILED) {
|
||||
|
@ -107,8 +106,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
|||
pBuffInfo->rebuildWindow = true;
|
||||
}
|
||||
} else {
|
||||
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(
|
||||
pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||
pAggSup->windowCount,
|
||||
(void**)&pCurWin->winInfo.pStatePos, &size);
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -289,8 +289,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
|
||||
code = saveResult(curWin.winInfo, pStUpdated);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo),
|
||||
tstrerror(code));
|
||||
qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -403,7 +402,8 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
|
||||
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
|
||||
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
|
||||
sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
||||
// 2.twAggSup
|
||||
|
@ -650,11 +650,13 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
|
||||
SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
@ -664,18 +666,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar);
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pCountNode->window.watermark,
|
||||
|
@ -686,12 +684,11 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
};
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
||||
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
||||
sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
||||
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->streamAggSup.windowCount = pCountNode->windowCount;
|
||||
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
|
||||
|
||||
|
@ -709,7 +706,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->dataVersion = 0;
|
||||
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
|
||||
if (!pInfo->historyWins) {
|
||||
goto _error;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
|
@ -735,8 +733,12 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
return pOperator;
|
||||
|
||||
|
@ -747,6 +749,6 @@ _error:
|
|||
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -89,7 +89,8 @@ void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo
|
|||
pWinInfo->pWinFlag = (SEventWinfowFlag*)pFlagInfo;
|
||||
}
|
||||
|
||||
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, SEventWindowInfo* pWinInfo) {
|
||||
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos,
|
||||
SEventWindowInfo* pWinInfo) {
|
||||
pWinInfo->winInfo.sessionWin = *pKey;
|
||||
pWinInfo->winInfo.pStatePos = pPos;
|
||||
setEventWindowFlag(pAggSup, pWinInfo);
|
||||
|
@ -122,7 +123,8 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
|
|||
pCurWin->winInfo.sessionWin.groupId = groupId;
|
||||
pCurWin->winInfo.sessionWin.win.skey = ts;
|
||||
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
||||
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||
SStreamStateCur* pCur =
|
||||
pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||
SSessionKey leftWinKey = {.groupId = groupId};
|
||||
void* pVal = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -173,8 +175,8 @@ _end:
|
|||
}
|
||||
|
||||
int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey,
|
||||
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows,
|
||||
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
|
||||
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start,
|
||||
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
|
||||
*pRebuild = false;
|
||||
if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) {
|
||||
return 1;
|
||||
|
@ -244,12 +246,14 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC
|
|||
}
|
||||
SEventWindowInfo nextWinInfo = {0};
|
||||
getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo);
|
||||
if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
|
||||
if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) ||
|
||||
!inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
|
||||
releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||
break;
|
||||
}
|
||||
setEventWindowFlag(pAggSup, &nextWinInfo);
|
||||
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, pStDeleted, false);
|
||||
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated,
|
||||
pStDeleted, false);
|
||||
pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag;
|
||||
winNum++;
|
||||
}
|
||||
|
@ -290,7 +294,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
return;
|
||||
}
|
||||
|
||||
SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
|
||||
SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
|
||||
.pDataBlock = pSDataBlock->pDataBlock};
|
||||
code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶mStart);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("set data from start slotId error.");
|
||||
|
@ -299,7 +304,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
int32_t statusStart = 0;
|
||||
filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart);
|
||||
|
||||
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
|
||||
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
|
||||
.pDataBlock = pSDataBlock->pDataBlock};
|
||||
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶mEnd);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("set data from end slotId error.");
|
||||
|
@ -321,11 +327,13 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
bool allEqual = true;
|
||||
SEventWindowInfo curWin = {0};
|
||||
SSessionKey nextWinKey = {0};
|
||||
setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey);
|
||||
setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
|
||||
&nextWinKey);
|
||||
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
|
||||
bool rebuild = false;
|
||||
winRows = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, rows, i,
|
||||
pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
|
||||
winRows =
|
||||
updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
|
||||
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
|
||||
ASSERT(winRows >= 1);
|
||||
if (rebuild) {
|
||||
uint64_t uid = 0;
|
||||
|
@ -333,7 +341,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
|
||||
tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
|
||||
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
|
||||
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) && !isWindowIncomplete(&curWin)) {
|
||||
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) &&
|
||||
!isWindowIncomplete(&curWin)) {
|
||||
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
|
||||
}
|
||||
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
|
||||
|
@ -603,7 +612,8 @@ void streamEventReleaseState(SOperatorInfo* pOperator) {
|
|||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
qDebug("===stream=== event window operator relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||
qDebug("===stream=== event window operator relase state. save result count:%d",
|
||||
(int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME,
|
||||
strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize);
|
||||
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||
|
@ -693,6 +703,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
|
||||
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -783,20 +794,18 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -804,5 +813,6 @@ _error:
|
|||
destroyStreamEventOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -30,10 +30,14 @@
|
|||
|
||||
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||
#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL)
|
||||
#define IS_NORMAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||
#define IS_NORMAL_INTERVAL_OP(op) \
|
||||
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \
|
||||
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||
|
||||
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||
#define IS_NORMAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||
#define IS_NORMAL_SESSION_OP(op) \
|
||||
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || \
|
||||
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||
|
||||
#define IS_NORMAL_STATE_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE)
|
||||
|
||||
|
@ -263,14 +267,16 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
|||
if (chIds) {
|
||||
int32_t childId = getChildIndex(pBlock);
|
||||
if (pInvalidWins) {
|
||||
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
|
||||
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts,
|
||||
winRes.groupId, childId);
|
||||
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
|
||||
}
|
||||
|
||||
SArray* chArray = *(void**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId);
|
||||
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey,
|
||||
winGpId, childId);
|
||||
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
|
||||
continue;
|
||||
}
|
||||
|
@ -626,8 +632,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
}
|
||||
|
||||
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
|
||||
int32_t numOfCh, SOperatorInfo* pOperator) {
|
||||
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval,
|
||||
SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||
|
@ -665,7 +671,8 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
|
|||
.calWin.skey = nextWin.skey,
|
||||
.calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
|
||||
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d",
|
||||
winRes.ts, winRes.groupId, numOfCh);
|
||||
if (IS_MID_INTERVAL_OP(pOperator)) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||
taosArrayPush(pInfo->pMidPullDatas, &winRes);
|
||||
|
@ -753,7 +760,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat
|
|||
if (pBlock->info.id.groupId == 0) {
|
||||
pBlock->info.id.groupId = groupId;
|
||||
void* tbname = NULL;
|
||||
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, false) < 0) {
|
||||
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname,
|
||||
false) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
} else {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
|
@ -831,9 +839,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
|||
return startPos;
|
||||
}
|
||||
|
||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) {
|
||||
return pInfo->primaryPkIndex != -1;
|
||||
}
|
||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
||||
|
||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
|
||||
|
@ -862,8 +868,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
}
|
||||
|
||||
if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) {
|
||||
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
|
||||
",maxKey %" PRId64,
|
||||
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 ",maxKey %" PRId64,
|
||||
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
|
||||
blockDataUpdateTsWindow(pSDataBlock, pInfo->primaryTsIndex);
|
||||
|
||||
|
@ -1757,27 +1762,37 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
|
||||
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->tsColIndex = tsColIndex;
|
||||
}
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic);
|
||||
return;
|
||||
code = initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic);
|
||||
return code;
|
||||
}
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||
pScanInfo->pState = pAggSup->pState;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
|
||||
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
|
||||
code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
|
||||
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
|
||||
&pScanInfo->pUpdateInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
pScanInfo->twAggSup = *pTwSup;
|
||||
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static TSKEY sesionTs(void* pKey) {
|
||||
|
@ -2456,8 +2471,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
|||
pBlock->info.id.groupId = pKey->groupId;
|
||||
|
||||
void* tbname = NULL;
|
||||
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
|
||||
&tbname, false) < 0) {
|
||||
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname,
|
||||
false) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
} else {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
|
@ -3020,6 +3035,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t lino = 0;
|
||||
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -3114,8 +3130,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
return pOperator;
|
||||
|
||||
|
@ -3126,6 +3146,7 @@ _error:
|
|||
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3683,9 +3704,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
|
||||
pAggSup->stateKeySize, compareStateKey,
|
||||
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize,
|
||||
compareStateKey, (void**)&winfo.pStatePos,
|
||||
&pAggSup->resultRowSize);
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
@ -3943,12 +3964,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
|
||||
|
@ -3957,9 +3979,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||
|
@ -3986,9 +4006,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
||||
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
||||
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
@ -4001,7 +4020,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->dataVersion = 0;
|
||||
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
|
||||
if (!pInfo->historyWins) {
|
||||
goto _error;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
if (pHandle) {
|
||||
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||
|
@ -4028,17 +4048,20 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyStreamStateOperatorInfo(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4205,7 +4228,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
||||
};
|
||||
|
||||
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
||||
pInfo->twAggSup =
|
||||
(STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
|
@ -4613,5 +4637,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
|
||||
setOperatorCompleted(pOperator);
|
||||
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
|
||||
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo),
|
||||
getStreamOpName(pOperator->operatorType), pOperator->status);
|
||||
}
|
||||
|
|
|
@ -304,6 +304,7 @@ TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t p
|
|||
len = colDataGetRowLength(pPkDataInfo, i);
|
||||
}
|
||||
int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff);
|
||||
// we don't care whether the data is updated or not
|
||||
tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
|
||||
}
|
||||
}
|
||||
|
@ -413,59 +414,119 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) {
|
|||
pInfo->pCloseWinSBF = NULL;
|
||||
}
|
||||
|
||||
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo) {
|
||||
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (!pInfo) {
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tStartEncode(&encoder) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
|
||||
if (tEncodeI32(&encoder, size) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, size) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
|
||||
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, *pTs) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
|
||||
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sBfSize) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
for (int32_t i = 0; i < sBfSize; i++) {
|
||||
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
|
||||
if (tScalableBfEncode(pSBf, &encoder) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (tEncodeI64(&encoder, pInfo->interval) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (tEncodeI64(&encoder, pInfo->watermark) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (tEncodeI64(&encoder, pInfo->minTS) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
|
||||
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int32_t mapSize = taosHashGetSize(pInfo->pMap);
|
||||
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, mapSize) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
|
||||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
int32_t valueSize = taosHashGetValueSize(pIte);
|
||||
if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) return -1;
|
||||
if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
*pLen = tlen;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
|
||||
|
|
Loading…
Reference in New Issue