opt stream force fill op

This commit is contained in:
54liuyao 2024-11-26 19:46:36 +08:00
parent 5472a22a11
commit 22023575ba
11 changed files with 130 additions and 54 deletions

View File

@ -823,10 +823,11 @@ typedef struct SStreamFillOperatorInfo {
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
SStreamAggSupporter* pStreamAggSup;
SArray* pCloseTs;
SArray* pUpdated;
SGroupResInfo groupResInfo;
SStreamState* pState;
SStateStore stateStore;
} SStreamFillOperatorInfo;
typedef struct SStreamTimeSliceOperatorInfo {

View File

@ -149,7 +149,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);

View File

@ -102,8 +102,9 @@ int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY*
int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
struct SOperatorInfo** ppOptInfo);
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated);
int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated);
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
#ifdef __cplusplus
}

View File

@ -120,6 +120,7 @@ typedef struct SStreamFillInfo {
uint64_t curGroupId;
bool hasNext;
SResultRowData* pNonFillRow;
void* pTempBuff;
} SStreamFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);

View File

@ -614,7 +614,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {

View File

@ -131,6 +131,7 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
pFillInfo->pLinearInfo = NULL;
taosArrayDestroy(pFillInfo->delRanges);
taosMemoryFreeClear(pFillInfo->pTempBuff);
taosMemoryFree(pFillInfo);
}
@ -150,6 +151,14 @@ static void destroyStreamFillOperatorInfo(void* param) {
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->pCloseTs);
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
}
if (pInfo->pState != NULL) {
taosMemoryFreeClear(pInfo->pState);
}
taosMemoryFree(pInfo);
}
@ -1159,12 +1168,17 @@ _end:
return code;
}
static void resetForceFillWindow(SResultRowData* pRowData) {
pRowData->key = INT64_MIN;
pRowData->pRowVal = NULL;
}
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
SStreamFillOperatorInfo* pInfo = pOperator->info;
bool res = false;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
@ -1174,25 +1188,30 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter*
} else if (pBlock->info.id.groupId != pKey->groupId) {
break;
}
void* val = NULL;
SRowBuffPos* pValPos = NULL;
int32_t len = 0;
int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL);
int32_t winCode = TSDB_CODE_SUCCESS;
code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = val;
pFillSup->cur.pRowVal = pValPos->pRowBuff;
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
resetFillWindow(&pFillSup->cur);
resetForceFillWindow(&pFillSup->cur);
releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore);
} else {
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey);
SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
void* preVal = NULL;
SRowBuffPos* prePos = NULL;
int32_t preVLen = 0;
winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey,
(void**)&prePos, &preVLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = preVal;
pFillSup->cur.pRowVal = prePos->pRowBuff;
if (pFillInfo->type == TSDB_FILL_PREV) {
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
@ -1202,9 +1221,9 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter*
code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
}
resetFillWindow(&pFillSup->cur);
resetForceFillWindow(&pFillSup->cur);
}
pAPI->stateStore.streamStateFreeCur(pCur);
releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore);
}
}
@ -1249,6 +1268,45 @@ _end:
return code;
}
static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey key = {.groupId = groupId, .ts = pRow->key};
int32_t curVLen = 0;
SRowBuffPos* pStatePos = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos,
&curVLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize);
qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
int32_t rowId, uint64_t groupId, int32_t rowSize) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TSKEY ts = tsCol[rowId];
pFillInfo->nextRowKey = ts;
TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize);
SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff};
transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
keepResultInStateBuf(pInfo, groupId, &tmpNextRow);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// force window close impl
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1259,11 +1317,10 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId;
SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++){
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
QUERY_CHECK_CODE(code, lino, _end);
int32_t size = taosArrayGetSize(pInfo->pCloseTs);
@ -1283,7 +1340,7 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
}
}
}
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
_end:
@ -1293,13 +1350,13 @@ _end:
return code;
}
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int64_t groupId = 0;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState);
while (1) {
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
if (winCode != TSDB_CODE_SUCCESS) {
break;
}
@ -1307,14 +1364,14 @@ int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdat
void* pPushRes = taosArrayPush(pUpdated, &key);
QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
pAggSup->stateStore.streamStateGroupCurNext(pCur);
pStateStore->streamStateGroupCurNext(pCur);
}
pAggSup->stateStore.streamStateFreeCur(pCur);
pStateStore->streamStateFreeCur(pCur);
pCur = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
pAggSup->stateStore.streamStateFreeCur(pCur);
pStateStore->streamStateFreeCur(pCur);
pCur = NULL;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
@ -1347,7 +1404,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
(*ppRes) = resBlock;
goto _end;
}
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
goto _end;
@ -1395,7 +1453,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated);
code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pInfo->pCloseTs);
@ -1414,7 +1472,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) {
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
setStreamOperatorCompleted(pOperator);
}
@ -1621,6 +1679,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->delIndex = 0;
pFillInfo->curGroupId = 0;
pFillInfo->hasNext = false;
pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize);
return pFillInfo;
_end:
@ -1664,21 +1723,18 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo*
}
}
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) {
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (IS_NORMAL_INTERVAL_OP(downstream)) {
SStreamIntervalOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval;
(*ppAggSup) = NULL;
} else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval;
pInfo->hasFill = true;
(*ppAggSup) = &pInfo->streamAggSup;
pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState);
} else {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
}
@ -1691,8 +1747,31 @@ _end:
return code;
}
int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore,
SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
pInfo->stateStore = *pStore;
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno);
*(pInfo->pState) = *pState;
pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol);
code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs,
pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId,
STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
@ -1718,7 +1797,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
int8_t triggerType = 0;
SInterval interval = {0};
code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup);
code = getDownStreamInfo(downstream, &triggerType, &interval);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
@ -1773,9 +1852,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
pTaskInfo);
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
} else {
pInfo->pState != NULL;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
}

View File

@ -360,9 +360,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
return code;
}
if (pInfo->hasFill == false) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
return code;
@ -452,9 +450,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) {
if (pInfo->hasFill == false) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
}

View File

@ -1909,7 +1909,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
qDebug("===stream===build stream result, ts count:%d", size);
for (int32_t i = 0; i < size; i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated);
code = buildAllResultKey(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end);
}
qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated));

View File

@ -1834,7 +1834,7 @@ int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) {
return deleteMark;
}
static TSKEY compareTs(void* pKey) {
TSKEY compareTs(void* pKey) {
SWinKey* pWinKey = (SWinKey*)pKey;
return pWinKey->ts;
}

View File

@ -3094,7 +3094,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf
qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr);
qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr);

View File

@ -1232,11 +1232,6 @@ void clearExpiredState(SStreamFileState* pFileState) {
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
}
if (pFileState->hasFillCatch == false) {
int32_t code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
qTrace("force clear expired file, ts:%" PRId64 ". %s at line %d res %d", pKey->ts, __func__, __LINE__, code_file);
}
}
taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
}