fix mem leak
This commit is contained in:
parent
95b8754dab
commit
ad0b0c3821
|
@ -110,7 +110,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
|
||||||
int64_t* ts = (int64_t*)pColData->pData;
|
int64_t* ts = (int64_t*)pColData->pData;
|
||||||
|
|
||||||
int64_t duration = pWin->ekey - pWin->skey + delta;
|
int64_t duration = pWin->ekey - pWin->skey + delta;
|
||||||
|
@ -419,7 +419,7 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
|
||||||
|
|
||||||
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
|
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
|
||||||
if (pInterval->interval != pInterval->sliding &&
|
if (pInterval->interval != pInterval->sliding &&
|
||||||
((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart) )) {
|
((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -904,8 +904,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||||
int32_t forwardRows =
|
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder);
|
pInfo->binfo.inputTsOrder);
|
||||||
|
|
||||||
// prev time window not interpolation yet.
|
// prev time window not interpolation yet.
|
||||||
if (pInfo->timeWindowInterpo) {
|
if (pInfo->timeWindowInterpo) {
|
||||||
|
@ -932,7 +932,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder);
|
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
|
||||||
|
pInfo->binfo.inputTsOrder);
|
||||||
if (startPos < 0) {
|
if (startPos < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -944,8 +945,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||||
forwardRows =
|
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder);
|
pInfo->binfo.inputTsOrder);
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
|
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
|
||||||
// TODO: add to open window? how to close the open windows after input blocks exhausted?
|
// TODO: add to open window? how to close the open windows after input blocks exhausted?
|
||||||
|
@ -1604,7 +1605,8 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
|
||||||
pScanInfo->windowSup.parentType = type;
|
pScanInfo->windowSup.parentType = type;
|
||||||
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate);
|
pScanInfo->pUpdateInfo =
|
||||||
|
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate);
|
||||||
}
|
}
|
||||||
|
|
||||||
pScanInfo->interval = pInfo->interval;
|
pScanInfo->interval = pInfo->interval;
|
||||||
|
@ -1632,8 +1634,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
|
||||||
int32_t code =
|
int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||||
initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -1862,7 +1864,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
||||||
if (pStateNode->window.pExprs != NULL) {
|
if (pStateNode->window.pExprs != NULL) {
|
||||||
int32_t numOfScalarExpr = 0;
|
int32_t numOfScalarExpr = 0;
|
||||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
|
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
|
||||||
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
int32_t code =
|
||||||
|
initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -2033,13 +2036,14 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { return pStore->streamStateCheck(pState, pKey); }
|
bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) {
|
||||||
|
return pStore->streamStateCheck(pState, pKey);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
|
int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
|
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
|
||||||
SAggSupporter* pAggSup, SStateStore* pStore) {
|
SAggSupporter* pAggSup, SStateStore* pStore) {
|
||||||
|
SWinKey key = {.ts = win->skey, .groupId = groupId};
|
||||||
SWinKey key = { .ts = win->skey, .groupId = groupId };
|
|
||||||
char* value = NULL;
|
char* value = NULL;
|
||||||
int32_t size = pAggSup->resultRowSize;
|
int32_t size = pAggSup->resultRowSize;
|
||||||
|
|
||||||
|
@ -2056,7 +2060,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore) {
|
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
|
||||||
|
SStateStore* pStore) {
|
||||||
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
|
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
|
||||||
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
|
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
|
||||||
if (!hasIntervalWindow(pState, &key, pStore)) {
|
if (!hasIntervalWindow(pState, &key, pStore)) {
|
||||||
|
@ -2126,7 +2131,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) {
|
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
|
||||||
|
int32_t numOfCh, SOperatorInfo* pOperator) {
|
||||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
|
@ -2149,13 +2155,13 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
|
||||||
// pull data is over
|
// pull data is over
|
||||||
taosArrayDestroy(chArray);
|
taosArrayDestroy(chArray);
|
||||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||||
qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts);
|
qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts);
|
||||||
|
|
||||||
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
|
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
|
||||||
if (pFinalCh) {
|
if (pFinalCh) {
|
||||||
taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey));
|
taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey));
|
||||||
doDeleteWindow(pOperator, winRes.ts, winRes.groupId);
|
doDeleteWindow(pOperator, winRes.ts, winRes.groupId);
|
||||||
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
|
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
|
||||||
SPullWindowInfo pull = {.window = nextWin,
|
SPullWindowInfo pull = {.window = nextWin,
|
||||||
.groupId = winRes.groupId,
|
.groupId = winRes.groupId,
|
||||||
.calWin.skey = nextWin.skey,
|
.calWin.skey = nextWin.skey,
|
||||||
|
@ -2194,7 +2200,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i
|
||||||
} else {
|
} else {
|
||||||
SArray* chArray = *(void**)chIds;
|
SArray* chArray = *(void**)chIds;
|
||||||
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
|
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
|
||||||
qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index);
|
qDebug("===stream===check final retrive %" PRId64 ",chid:%d", winKey->ts, index);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
qDebug("===stream===add final retrive %" PRId64, winKey->ts);
|
qDebug("===stream===add final retrive %" PRId64, winKey->ts);
|
||||||
taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0);
|
taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0);
|
||||||
|
@ -2215,8 +2221,8 @@ int32_t getOutputBuf(void* pState, SRowBuffPos* pPos, SResultRow** pResult, SSta
|
||||||
|
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo) {
|
SGroupResInfo* pGroupResInfo) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||||
int32_t numOfExprs = pSup->numOfExprs;
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
|
@ -2347,7 +2353,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
||||||
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) ||
|
||||||
|
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
||||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||||
if (startPos < 0) {
|
if (startPos < 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -2362,7 +2369,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
||||||
.groupId = groupId,
|
.groupId = groupId,
|
||||||
};
|
};
|
||||||
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
||||||
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && !chIds) {
|
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed &&
|
||||||
|
!chIds) {
|
||||||
SPullWindowInfo pull = {
|
SPullWindowInfo pull = {
|
||||||
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||||
// add pull data request
|
// add pull data request
|
||||||
|
@ -2481,57 +2489,57 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSWinKey(void **buf, SWinKey* key) {
|
int32_t encodeSWinKey(void** buf, SWinKey* key) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, key->ts);
|
tlen += taosEncodeFixedI64(buf, key->ts);
|
||||||
tlen += taosEncodeFixedU64(buf, key->groupId);
|
tlen += taosEncodeFixedU64(buf, key->groupId);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSWinKey(void *buf, SWinKey* key) {
|
void* decodeSWinKey(void* buf, SWinKey* key) {
|
||||||
buf = taosDecodeFixedI64(buf, &key->ts);
|
buf = taosDecodeFixedI64(buf, &key->ts);
|
||||||
buf = taosDecodeFixedU64(buf, &key->groupId);
|
buf = taosDecodeFixedU64(buf, &key->groupId);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSRowBuffPos(void **buf, SRowBuffPos* pos) {
|
int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += encodeSWinKey(buf, pos->pKey);
|
tlen += encodeSWinKey(buf, pos->pKey);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSRowBuffPos(void *buf, SRowBuffPos* pos) {
|
void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) {
|
||||||
buf = decodeSWinKey(buf, pos->pKey);
|
buf = decodeSWinKey(buf, pos->pKey);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup) {
|
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs);
|
tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs);
|
||||||
tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs);
|
tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup) {
|
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup) {
|
||||||
buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs);
|
buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs);
|
||||||
buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs);
|
buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSTimeWindow(void **buf, STimeWindow* pWin) {
|
int32_t encodeSTimeWindow(void** buf, STimeWindow* pWin) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pWin->skey);
|
tlen += taosEncodeFixedI64(buf, pWin->skey);
|
||||||
tlen += taosEncodeFixedI64(buf, pWin->ekey);
|
tlen += taosEncodeFixedI64(buf, pWin->ekey);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSTimeWindow(void *buf, STimeWindow* pWin) {
|
void* decodeSTimeWindow(void* buf, STimeWindow* pWin) {
|
||||||
buf = taosDecodeFixedI64(buf, &pWin->skey);
|
buf = taosDecodeFixedI64(buf, &pWin->skey);
|
||||||
buf = taosDecodeFixedI64(buf, &pWin->ekey);
|
buf = taosDecodeFixedI64(buf, &pWin->ekey);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSPullWindowInfo(void **buf, SPullWindowInfo* pPullInfo) {
|
int32_t encodeSPullWindowInfo(void** buf, SPullWindowInfo* pPullInfo) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += encodeSTimeWindow(buf, &pPullInfo->calWin);
|
tlen += encodeSTimeWindow(buf, &pPullInfo->calWin);
|
||||||
tlen += taosEncodeFixedU64(buf, pPullInfo->groupId);
|
tlen += taosEncodeFixedU64(buf, pPullInfo->groupId);
|
||||||
|
@ -2539,14 +2547,14 @@ int32_t encodeSPullWindowInfo(void **buf, SPullWindowInfo* pPullInfo) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSPullWindowInfo(void *buf, SPullWindowInfo* pPullInfo) {
|
void* decodeSPullWindowInfo(void* buf, SPullWindowInfo* pPullInfo) {
|
||||||
buf = decodeSTimeWindow(buf, &pPullInfo->calWin);
|
buf = decodeSTimeWindow(buf, &pPullInfo->calWin);
|
||||||
buf = taosDecodeFixedU64(buf, &pPullInfo->groupId);
|
buf = taosDecodeFixedU64(buf, &pPullInfo->groupId);
|
||||||
buf = decodeSTimeWindow(buf, &pPullInfo->window);
|
buf = decodeSTimeWindow(buf, &pPullInfo->window);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSPullWindowInfoArray(void **buf, SArray* pPullInfos) {
|
int32_t encodeSPullWindowInfoArray(void** buf, SArray* pPullInfos) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
int32_t size = taosArrayGetSize(pPullInfos);
|
int32_t size = taosArrayGetSize(pPullInfos);
|
||||||
tlen += taosEncodeFixedI32(buf, size);
|
tlen += taosEncodeFixedI32(buf, size);
|
||||||
|
@ -2557,7 +2565,7 @@ int32_t encodeSPullWindowInfoArray(void **buf, SArray* pPullInfos) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) {
|
void* decodeSPullWindowInfoArray(void* buf, SArray* pPullInfos) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &size);
|
buf = taosDecodeFixedI32(buf, &size);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
@ -2568,7 +2576,7 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator) {
|
int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2580,11 +2588,11 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable);
|
int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable);
|
||||||
tlen += taosEncodeFixedI32(buf, mapSize);
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
void *pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
||||||
void *key = taosHashGetKey(pIte, &keyLen);
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
tlen += encodeSWinKey(buf, key);
|
tlen += encodeSWinKey(buf, key);
|
||||||
SRowBuffPos* pPos = *(void**)pIte;
|
SRowBuffPos* pPos = *(void**)pIte;
|
||||||
tlen += encodeSRowBuffPos(buf, pPos);
|
tlen += encodeSRowBuffPos(buf, pPos);
|
||||||
|
@ -2599,7 +2607,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO
|
||||||
pIte = NULL;
|
pIte = NULL;
|
||||||
keyLen = 0;
|
keyLen = 0;
|
||||||
while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) {
|
while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) {
|
||||||
void *key = taosHashGetKey(pIte, &keyLen);
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
tlen += encodeSWinKey(buf, key);
|
tlen += encodeSWinKey(buf, key);
|
||||||
SArray* pArray = (SArray*)pIte;
|
SArray* pArray = (SArray*)pIte;
|
||||||
int32_t chSize = taosArrayGetSize(pArray);
|
int32_t chSize = taosArrayGetSize(pArray);
|
||||||
|
@ -2628,16 +2636,16 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO
|
||||||
}
|
}
|
||||||
|
|
||||||
void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6.checksum
|
// 6.checksum
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
ASSERT(0); // debug
|
||||||
qError("stream interval state is invalid");
|
qError("stream interval state is invalid");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2646,7 +2654,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SWinKey key = {0};
|
SWinKey key = {0};
|
||||||
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
||||||
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
|
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
|
||||||
buf = decodeSWinKey(buf, &key);
|
buf = decodeSWinKey(buf, &key);
|
||||||
|
@ -2695,10 +2703,10 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
|
|
||||||
|
@ -2937,8 +2945,8 @@ int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) {
|
||||||
int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
|
int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
|
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
|
||||||
size = TMAX(size, resSize);
|
size = TMAX(size, resSize);
|
||||||
}
|
}
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
@ -2946,11 +2954,12 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
|
||||||
void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = sizeof(TSKEY);
|
int32_t resSize = sizeof(TSKEY);
|
||||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
|
||||||
}
|
}
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
|
@ -2961,11 +2970,11 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) {
|
||||||
void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
|
||||||
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
|
||||||
TSKEY ts = *(TSKEY*)pBuf;
|
TSKEY ts = *(TSKEY*)pBuf;
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts);
|
pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts);
|
||||||
}
|
}
|
||||||
|
@ -2976,7 +2985,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) {
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
||||||
|
SReadHandle* pHandle) {
|
||||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -3009,7 +3019,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||||
int32_t numOfScalar = 0;
|
int32_t numOfScalar = 0;
|
||||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||||
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -3066,7 +3076,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pInfo->numOfDatapack = 0;
|
pInfo->numOfDatapack = 0;
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||||
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
|
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
|
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
|
||||||
|
@ -3094,7 +3104,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
|
@ -3181,13 +3192,15 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
||||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||||
pScanInfo->pState = pAggSup->pState;
|
pScanInfo->pState = pAggSup->pState;
|
||||||
if (!pScanInfo->pUpdateInfo) {
|
if (!pScanInfo->pUpdateInfo) {
|
||||||
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, pScanInfo->igCheckUpdate);
|
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
|
||||||
|
pScanInfo->igCheckUpdate);
|
||||||
}
|
}
|
||||||
pScanInfo->twAggSup = *pTwSup;
|
pScanInfo->twAggSup = *pTwSup;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) {
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
|
SReadHandle* pHandle, SStorageAPI* pApi) {
|
||||||
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
|
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
|
||||||
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
pSup->gap = gap;
|
pSup->gap = gap;
|
||||||
|
@ -3297,7 +3310,8 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
|
||||||
|
|
||||||
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
|
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
|
int32_t code =
|
||||||
|
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3410,7 +3424,8 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
|
||||||
setSessionWinOutputInfo(pStUpdated, pNextWin);
|
setSessionWinOutputInfo(pStUpdated, pNextWin);
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
pNextWin->sessionWin = pCurWin->sessionWin;
|
pNextWin->sessionWin = pCurWin->sessionWin;
|
||||||
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
|
int32_t code =
|
||||||
|
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFreeClear(pNextWin->pOutputBuf);
|
taosMemoryFreeClear(pNextWin->pOutputBuf);
|
||||||
SET_SESSION_WIN_INVALID(*pNextWin);
|
SET_SESSION_WIN_INVALID(*pNextWin);
|
||||||
|
@ -3420,9 +3435,9 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
|
||||||
|
|
||||||
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
|
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
|
||||||
SSHashObj* pStDeleted) {
|
SSHashObj* pStDeleted) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SResultRow* pCurResult = NULL;
|
SResultRow* pCurResult = NULL;
|
||||||
|
@ -3460,7 +3475,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
||||||
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
|
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
|
||||||
|
&pAggSup->stateStore);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3631,9 +3647,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
|
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pWinArray);
|
int32_t size = taosArrayGetSize(pWinArray);
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
@ -3672,7 +3688,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
num++;
|
num++;
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
|
||||||
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
|
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
|
||||||
pChild->exprSupp.rowEntryInfoOffset);
|
pChild->exprSupp.rowEntryInfoOffset);
|
||||||
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||||
|
@ -3747,8 +3763,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
|
||||||
pGroupResInfo->pBuf = NULL;
|
pGroupResInfo->pBuf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
|
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) {
|
||||||
SSDataBlock* pBlock) {
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
// set output datablock version
|
// set output datablock version
|
||||||
pBlock->info.version = pTaskInfo->version;
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
@ -3787,20 +3802,20 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSSessionKey(void **buf, SSessionKey* key) {
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += encodeSTimeWindow(buf, &key->win);
|
tlen += encodeSTimeWindow(buf, &key->win);
|
||||||
tlen += taosEncodeFixedU64(buf, key->groupId);
|
tlen += taosEncodeFixedU64(buf, key->groupId);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSSessionKey(void *buf, SSessionKey* key) {
|
void* decodeSSessionKey(void* buf, SSessionKey* key) {
|
||||||
buf = decodeSTimeWindow(buf, &key->win);
|
buf = decodeSTimeWindow(buf, &key->win);
|
||||||
buf = taosDecodeFixedU64(buf, &key->groupId);
|
buf = taosDecodeFixedU64(buf, &key->groupId);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t encodeSResultWindowInfo(void **buf, SResultWindowInfo* key, int32_t outLen) {
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
||||||
tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen);
|
tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen);
|
||||||
|
@ -3808,14 +3823,14 @@ int32_t encodeSResultWindowInfo(void **buf, SResultWindowInfo* key, int32_t outL
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen);
|
buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen);
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -3827,11 +3842,11 @@ int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOp
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
tlen += taosEncodeFixedI32(buf, mapSize);
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
void *pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
void *key = taosHashGetKey(pIte, &keyLen);
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
tlen += encodeSSessionKey(buf, key);
|
tlen += encodeSSessionKey(buf, key);
|
||||||
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
}
|
}
|
||||||
|
@ -3872,9 +3887,9 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
ASSERT(0); // debug
|
||||||
qError("stream interval state is invalid");
|
qError("stream interval state is invalid");
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -3884,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
@ -3916,6 +3931,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
|
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
|
@ -4028,7 +4044,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
pInfo->pStUpdated = NULL;
|
pInfo->pStUpdated = NULL;
|
||||||
if(pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
}
|
}
|
||||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
|
@ -4066,8 +4082,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
|
||||||
|
resSize);
|
||||||
}
|
}
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
|
@ -4082,16 +4100,16 @@ void resetWinRange(STimeWindow* winRange) {
|
||||||
|
|
||||||
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
resetWinRange(&pAggSup->winRange);
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
SResultWindowInfo winInfo = {0};
|
SResultWindowInfo winInfo = {0};
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
|
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
|
||||||
int32_t num = size / sizeof(SSessionKey);
|
int32_t num = size / sizeof(SSessionKey);
|
||||||
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
|
||||||
ASSERT(size == num * sizeof(SSessionKey));
|
ASSERT(size == num * sizeof(SSessionKey));
|
||||||
for (int32_t i = 0; i < num; i++) {
|
for (int32_t i = 0; i < num; i++) {
|
||||||
SResultWindowInfo winInfo = {0};
|
SResultWindowInfo winInfo = {0};
|
||||||
|
@ -4103,7 +4121,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
@ -4138,7 +4156,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
|
||||||
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI);
|
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
|
&pTaskInfo->storageAPI);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -4183,7 +4202,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
int32_t res =
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
|
@ -4346,14 +4367,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) {
|
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
|
||||||
|
SReadHandle* pHandle) {
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle);
|
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle);
|
||||||
if (pOperator == NULL) {
|
if (pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
|
pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
|
||||||
|
@ -4444,9 +4466,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
pCurWin->winInfo.sessionWin.groupId = groupId;
|
pCurWin->winInfo.sessionWin.groupId = groupId;
|
||||||
pCurWin->winInfo.sessionWin.win.skey = ts;
|
pCurWin->winInfo.sessionWin.win.skey = ts;
|
||||||
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
||||||
int32_t code =
|
int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
|
||||||
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
|
pKeyData, pAggSup->stateKeySize, compareStateKey,
|
||||||
compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
|
&pCurWin->winInfo.pOutputBuf, &size);
|
||||||
pCurWin->pStateKey =
|
pCurWin->pStateKey =
|
||||||
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
|
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
|
||||||
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
|
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
|
||||||
|
@ -4456,7 +4478,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
|
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf,
|
||||||
|
&pAggSup->pSessionAPI->stateStore);
|
||||||
pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size);
|
pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4472,7 +4495,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
|
|
||||||
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
|
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
|
||||||
pNextWin->winInfo.pOutputBuf = NULL;
|
pNextWin->winInfo.pOutputBuf = NULL;
|
||||||
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
SStreamStateCur* pCur =
|
||||||
|
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
|
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
|
||||||
|
@ -4585,7 +4609,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -4597,11 +4621,11 @@ int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOper
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
tlen += taosEncodeFixedI32(buf, mapSize);
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
void *pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
void *key = taosHashGetKey(pIte, &keyLen);
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
tlen += encodeSSessionKey(buf, key);
|
tlen += encodeSSessionKey(buf, key);
|
||||||
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
}
|
}
|
||||||
|
@ -4642,9 +4666,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
ASSERT(0); // debug
|
||||||
qError("stream interval state is invalid");
|
qError("stream interval state is invalid");
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -4654,7 +4678,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
@ -4768,7 +4792,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
pInfo->pSeUpdated = NULL;
|
pInfo->pSeUpdated = NULL;
|
||||||
|
|
||||||
if(pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4799,8 +4823,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void streamStateReleaseState(SOperatorInfo* pOperator) {
|
void streamStateReleaseState(SOperatorInfo* pOperator) {
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData,
|
||||||
|
resSize);
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
downstream->fpSet.releaseStreamStateFn(downstream);
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
||||||
|
@ -4809,14 +4835,14 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
||||||
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SResultRow* pCurResult = NULL;
|
SResultRow* pCurResult = NULL;
|
||||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
SResultRow* pWinResult = NULL;
|
SResultRow* pWinResult = NULL;
|
||||||
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
|
@ -4835,22 +4861,22 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
|
||||||
|
|
||||||
void streamStateReloadState(SOperatorInfo* pOperator) {
|
void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
resetWinRange(&pAggSup->winRange);
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
|
||||||
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
|
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
|
||||||
int32_t num = size / sizeof(SSessionKey);
|
int32_t num = size / sizeof(SSessionKey);
|
||||||
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
|
||||||
ASSERT(size == num * sizeof(SSessionKey));
|
ASSERT(size == num * sizeof(SSessionKey));
|
||||||
for (int32_t i = 0; i < num; i++) {
|
for (int32_t i = 0; i < num; i++) {
|
||||||
SStateWindowInfo curInfo = {0};
|
SStateWindowInfo curInfo = {0};
|
||||||
SStateWindowInfo nextInfo = {0};
|
SStateWindowInfo nextInfo = {0};
|
||||||
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
|
||||||
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
if (compareStateKey(curInfo.pStateKey, nextInfo.pStateKey)) {
|
||||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4859,7 +4885,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
@ -4936,7 +4962,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len);
|
int32_t res =
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamStateDecodeOpState(buff, len, pOperator, true);
|
doStreamStateDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
|
@ -5322,8 +5350,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
TSKEY ekey = ascScan ? win.ekey : win.skey;
|
||||||
int32_t forwardRows =
|
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder);
|
iaInfo->binfo.inputTsOrder);
|
||||||
ASSERT(forwardRows > 0);
|
ASSERT(forwardRows > 0);
|
||||||
|
|
||||||
// prev time window not interpolation yet.
|
// prev time window not interpolation yet.
|
||||||
|
@ -5353,8 +5381,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||||
startPos =
|
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
|
||||||
getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->binfo.inputTsOrder);
|
iaInfo->binfo.inputTsOrder);
|
||||||
if (startPos < 0) {
|
if (startPos < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -5368,8 +5396,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
ekey = ascScan ? nextWin.ekey : nextWin.skey;
|
||||||
forwardRows =
|
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
|
||||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder);
|
iaInfo->binfo.inputTsOrder);
|
||||||
|
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
|
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
|
||||||
|
@ -5536,7 +5564,7 @@ _error:
|
||||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
|
@ -5706,13 +5734,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
|
||||||
};
|
};
|
||||||
|
|
||||||
pInfo->twAggSup = (STimeWindowAggSupp){
|
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
|
||||||
.waterMark = pIntervalPhyNode->window.watermark,
|
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
.maxTs = INT64_MIN,
|
||||||
.maxTs = INT64_MIN,
|
.minTs = INT64_MAX,
|
||||||
.minTs = INT64_MAX,
|
.deleteMark = getDeleteMark(pIntervalPhyNode)};
|
||||||
.deleteMark = getDeleteMark(pIntervalPhyNode)
|
|
||||||
};
|
|
||||||
|
|
||||||
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
||||||
|
|
||||||
|
@ -5735,8 +5761,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
||||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
&pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -5770,7 +5796,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->numOfDatapack = 0;
|
pInfo->numOfDatapack = 0;
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
|
int32_t funResSize = getMaxFunResSize(pSup, numOfCols);
|
||||||
|
|
||||||
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||||
|
@ -5789,7 +5815,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
doStreamIntervalDecodeOpState(buff, len, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
|
|
Loading…
Reference in New Issue