stream interp

This commit is contained in:
54liuyao 2024-07-31 08:57:42 +08:00
parent 8b182cac9b
commit 745f14d140
3 changed files with 75 additions and 28 deletions

View File

@ -1096,6 +1096,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
pSup->deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC) {
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
pSup->deleteMarkSaved = pSup->deleteMark;
pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;

View File

@ -99,11 +99,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
pInfo->pUpdatedMap = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
}
if (!pInfo->pDeletedMap && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _end, terrno);
}
for (int32_t i = 0; i < num; i++) {
SWinKey* pKey = pKeyBuf + i;
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId,
@ -506,7 +502,8 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock,
static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint) {
SSlicePoint* pNextPoint, int32_t* pWinCode) {
int32_t tmpRes = TSDB_CODE_SUCCESS;
void* pState = pAggSup->pState;
resetPrevAndNextWindow(pFillSup);
pCurPoint->pResPos = NULL;
@ -516,7 +513,7 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo
pCurPoint->key.groupId = groupId;
pCurPoint->key.ts = ts;
int32_t curVLen = 0;
int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen);
(*pWinCode) = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen);
pCurPoint->pLeftRow = pCurPoint->pResPos->pRowBuff;
if (pFillSup->type == TSDB_FILL_LINEAR) {
pCurPoint->pRightRow = POINTER_SHIFT(pCurPoint->pResPos->pRowBuff, pFillSup->rowSize);
@ -533,9 +530,9 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo
if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) {
pPrevPoint->key.groupId = groupId;
int32_t preVLen = 0;
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
tmpRes = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
(void**)&pPrevPoint->pResPos, &preVLen);
if (code == TSDB_CODE_SUCCESS) {
if (tmpRes == TSDB_CODE_SUCCESS) {
pFillSup->prev.key = pPrevPoint->key.ts;
pFillSup->prev.pRowVal = pPrevPoint->pResPos->pRowBuff;
}
@ -548,9 +545,9 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo
if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) {
pNextPoint->key.groupId = groupId;
int32_t nextVLen = 0;
code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
tmpRes = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
(void**)&pNextPoint->pResPos, &nextVLen);
if (code == TSDB_CODE_SUCCESS) {
if (tmpRes == TSDB_CODE_SUCCESS) {
pFillSup->next.key = pNextPoint->key.ts;
pFillSup->next.pRowVal = pNextPoint->pResPos->pRowBuff;
}
@ -696,6 +693,7 @@ static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fil
}
static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t winCode = TSDB_CODE_SUCCESS;
SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -749,11 +747,14 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
SSlicePoint nextPoint = {0};
bool left = false;
bool right = false;
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint);
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode);
right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type);
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap);
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap);
}
}
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
@ -766,6 +767,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap);
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap);
}
}
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
@ -774,12 +778,15 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
break;
}
curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint);
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode);
right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type);
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap);
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap);
}
}
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
}
@ -807,7 +814,8 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
SSlicePoint prevPoint = {0};
SSlicePoint nextPoint = {0};
getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
int32_t winCode = TSDB_CODE_SUCCESS;
getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, &winCode);
setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts);
doStreamFillRange(pFillSup, pFillInfo, pBlock);
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
@ -879,7 +887,10 @@ _end:
return code;
}
static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) {
static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
SArray* pDelWins) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
@ -900,6 +911,9 @@ static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* p
break;
}
(void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey));
void* tmp = taosArrayPush(pDelWins, &key);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
pAggSup->stateStore.streamStateDel(pAggSup->pState, &key);
if (winCode != TSDB_CODE_SUCCESS) {
break;
@ -907,6 +921,12 @@ static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* p
key = nextKey;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -971,7 +991,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
switch (pBlock->info.type) {
case STREAM_DELETE_RESULT: {
doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap);
code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins);
QUERY_CHECK_CODE(code, lino, _end);
} break;
case STREAM_NORMAL:
case STREAM_INVALID: {

View File

@ -5736,15 +5736,20 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
}
if (pCxt->createStream) {
if (NULL != pSelect->pRange) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream Unsupported RANGE clause");
}
if (NULL == pSelect->pEvery || NULL == pSelect->pFill) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Missing EVERY clause or FILL clause");
}
} else {
if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) {
if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) {
// single point interp every can be omitted
} else {
if (pCxt->createStream) {
if (NULL == pSelect->pEvery || NULL == pSelect->pFill) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing EVERY clause or FILL clause");
}
} else {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing RANGE clause, EVERY clause or FILL clause");
@ -10182,6 +10187,11 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
}
}
if (pStmt->pOptions->fillHistory && pSelect->hasInterpFunc) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp unsupported Fill history");
}
return TSDB_CODE_SUCCESS;
}