Merge pull request #25327 from taosdata/fix/TD-29516
set ts column index for function
This commit is contained in:
commit
5fac6ea9c1
|
@ -328,7 +328,7 @@ typedef struct SStateStore {
|
||||||
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
|
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
|
||||||
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
|
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t (*streamStateClear)(SStreamState* pState);
|
int32_t (*streamStateClear)(SStreamState* pState);
|
||||||
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
|
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||||
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||||
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ struct SResultRowEntryInfo;
|
||||||
|
|
||||||
struct SFunctionNode;
|
struct SFunctionNode;
|
||||||
typedef struct SScalarParam SScalarParam;
|
typedef struct SScalarParam SScalarParam;
|
||||||
|
typedef struct SStreamState SStreamState;
|
||||||
|
|
||||||
typedef struct SFuncExecEnv {
|
typedef struct SFuncExecEnv {
|
||||||
int32_t calcMemSize;
|
int32_t calcMemSize;
|
||||||
|
@ -126,7 +127,7 @@ typedef struct SInputColumnInfoData {
|
||||||
typedef struct SSerializeDataHandle {
|
typedef struct SSerializeDataHandle {
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
int32_t currentPage;
|
int32_t currentPage;
|
||||||
void *pState;
|
SStreamState *pState;
|
||||||
} SSerializeDataHandle;
|
} SSerializeDataHandle;
|
||||||
|
|
||||||
// incremental state storage
|
// incremental state storage
|
||||||
|
@ -164,7 +165,7 @@ typedef struct STdbState {
|
||||||
void *txn;
|
void *txn;
|
||||||
} STdbState;
|
} STdbState;
|
||||||
|
|
||||||
typedef struct {
|
struct SStreamState {
|
||||||
STdbState *pTdbState;
|
STdbState *pTdbState;
|
||||||
struct SStreamFileState *pFileState;
|
struct SStreamFileState *pFileState;
|
||||||
int32_t number;
|
int32_t number;
|
||||||
|
@ -173,7 +174,8 @@ typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
int8_t dump;
|
int8_t dump;
|
||||||
} SStreamState;
|
int32_t tsIndex;
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
|
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
|
||||||
|
|
|
@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
||||||
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateClear(SStreamState* pState);
|
int32_t streamStateClear(SStreamState* pState);
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||||
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState;
|
||||||
typedef SList SStreamSnapshot;
|
typedef SList SStreamSnapshot;
|
||||||
|
|
||||||
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||||
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
|
|
||||||
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||||
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
|
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
|
||||||
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
|
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
|
||||||
|
@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi
|
||||||
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
|
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
|
||||||
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
|
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
|
||||||
|
|
||||||
|
typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
||||||
|
|
||||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||||
|
@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
|
||||||
|
|
||||||
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
||||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
||||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
|
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
|
||||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
||||||
|
|
||||||
void* getRowStateBuff(SStreamFileState* pFileState);
|
void* getRowStateBuff(SStreamFileState* pFileState);
|
||||||
|
@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
||||||
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
//function
|
||||||
|
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -897,7 +897,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
SStorageAPI* pApi);
|
SStorageAPI* pApi, int32_t tsIndex);
|
||||||
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||||
STimeWindowAggSupp* pTwSup);
|
STimeWindowAggSupp* pTwSup);
|
||||||
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||||
|
|
|
@ -2904,7 +2904,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->pUpdateInfo = NULL;
|
pInfo->pUpdateInfo = NULL;
|
||||||
pInfo->pTableScanOp = pTableScanOp;
|
pInfo->pTableScanOp = pTableScanOp;
|
||||||
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
|
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
|
|
|
@ -657,9 +657,10 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
||||||
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||||
|
|
|
@ -709,14 +709,14 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = tsSlotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
||||||
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->primaryTsIndex = tsSlotId;
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
|
|
|
@ -1522,7 +1522,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
|
|
||||||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||||
|
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1705,7 +1705,7 @@ static TSKEY sesionTs(void* pKey) {
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
SStorageAPI* pApi) {
|
SStorageAPI* pApi, int32_t tsIndex) {
|
||||||
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
||||||
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
pSup->gap = gap;
|
pSup->gap = gap;
|
||||||
|
@ -1721,7 +1721,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
*(pSup->pState) = *pState;
|
*(pSup->pState) = *pState;
|
||||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
|
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||||
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
||||||
|
@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||||
|
|
||||||
int32_t pageSize = 4096;
|
|
||||||
while (pageSize < pSup->resultRowSize * 4) {
|
|
||||||
pageSize <<= 1u;
|
|
||||||
}
|
|
||||||
// at least four pages need to be in buffer
|
|
||||||
int32_t bufSize = 4096 * 256;
|
|
||||||
if (bufSize <= pageSize) {
|
|
||||||
bufSize = pageSize * 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!osTempSpaceAvailable()) {
|
|
||||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
|
||||||
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
|
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSup->pSessionAPI = pApi;
|
pSup->pSessionAPI = pApi;
|
||||||
|
@ -2948,16 +2931,16 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
|
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
|
||||||
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
|
||||||
if (pSessionNode->window.pTsEnd) {
|
if (pSessionNode->window.pTsEnd) {
|
||||||
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
||||||
}
|
}
|
||||||
|
@ -3175,7 +3158,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
}
|
}
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
|
||||||
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||||
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i);
|
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
}
|
}
|
||||||
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
|
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
|
||||||
int16_t type = pColNode->node.resType.type;
|
int16_t type = pColNode->node.resType.type;
|
||||||
|
pInfo->primaryTsIndex = tsSlotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
||||||
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
||||||
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->primaryTsIndex = tsSlotId;
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
|
@ -4082,7 +4064,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
||||||
|
|
|
@ -3203,10 +3203,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
|
||||||
|
|
||||||
SWinKey key = {0};
|
SWinKey key = {0};
|
||||||
if (pCtx->saveHandle.pBuf == NULL) {
|
if (pCtx->saveHandle.pBuf == NULL) {
|
||||||
SColumnInfoData* pColInfo = pCtx->input.pPTS;
|
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
|
||||||
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
|
||||||
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
|
|
||||||
}
|
|
||||||
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
key.groupId = pSrcBlock->info.id.groupId;
|
key.groupId = pSrcBlock->info.id.groupId;
|
||||||
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
|
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
|
||||||
|
|
|
@ -202,6 +202,13 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||||
|
SWinKey* pTmpkey = pKey;
|
||||||
|
ASSERT(keyLen == sizeof(SWinKey));
|
||||||
|
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
|
||||||
|
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||||
SSessionKey* pKey = pPos->pKey;
|
SSessionKey* pKey = pPos->pKey;
|
||||||
|
|
|
@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) {
|
||||||
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
memcpy(buf + len - rowSize, value, vLen);
|
memcpy(buf + len - rowSize, value, vLen);
|
||||||
return code;
|
return code;
|
||||||
#else
|
#else
|
||||||
|
@ -290,11 +290,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
||||||
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
*ppVal = buf + len - rowSize;
|
*ppVal = buf + len - rowSize;
|
||||||
|
streamStateReleaseBuf(pState, pVal, false);
|
||||||
return code;
|
return code;
|
||||||
#else
|
#else
|
||||||
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
|
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
|
||||||
|
@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
|
||||||
|
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
||||||
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
||||||
streamFileStateReleaseBuff(pState->pFileState, pos, false);
|
streamStateReleaseBuf(pState, pos, false);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +396,10 @@ int32_t streamStateClear(SStreamState* pState) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
|
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
|
||||||
|
pState->number = number;
|
||||||
|
pState->tsIndex = tsIdex;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
|
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
|
|
|
@ -58,6 +58,8 @@ struct SStreamFileState {
|
||||||
_state_file_remove_fn stateFileRemoveFn;
|
_state_file_remove_fn stateFileRemoveFn;
|
||||||
_state_file_get_fn stateFileGetFn;
|
_state_file_get_fn stateFileGetFn;
|
||||||
_state_file_clear_fn stateFileClearFn;
|
_state_file_clear_fn stateFileClearFn;
|
||||||
|
|
||||||
|
_state_fun_get_fn stateFunctionGetFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef SRowBuffPos SRowBuffInfo;
|
typedef SRowBuffPos SRowBuffInfo;
|
||||||
|
@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
||||||
pFileState->stateFileGetFn = intervalFileGetFn;
|
pFileState->stateFileGetFn = intervalFileGetFn;
|
||||||
pFileState->stateFileClearFn = streamStateClear_rocksdb;
|
pFileState->stateFileClearFn = streamStateClear_rocksdb;
|
||||||
pFileState->cfName = taosStrdup("state");
|
pFileState->cfName = taosStrdup("state");
|
||||||
|
pFileState->stateFunctionGetFn = getRowBuff;
|
||||||
} else {
|
} else {
|
||||||
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
|
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
|
||||||
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
|
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
|
||||||
|
@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
||||||
pFileState->stateFileGetFn = sessionFileGetFn;
|
pFileState->stateFileGetFn = sessionFileGetFn;
|
||||||
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
|
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
|
||||||
pFileState->cfName = taosStrdup("sess");
|
pFileState->cfName = taosStrdup("sess");
|
||||||
|
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
||||||
|
@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
||||||
|
|
||||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
||||||
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
|
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
|
||||||
|
@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
|
||||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
||||||
|
|
||||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
||||||
|
|
||||||
|
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||||
|
return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
|
@ -372,17 +372,22 @@ print step4=============
|
||||||
|
|
||||||
sql create database test6 vgroups 4;
|
sql create database test6 vgroups 4;
|
||||||
sql use test6;
|
sql use test6;
|
||||||
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
|
sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
|
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
|
||||||
|
sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s);
|
||||||
|
sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s);
|
||||||
|
sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a);
|
||||||
|
sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9;
|
||||||
|
sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2);
|
||||||
|
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
sql insert into t1 values(1648791211000,1,2,3,0);
|
||||||
sql insert into t1 values(1648791213000,2,3,4,1.1);
|
sql insert into t1 values(1648791213000,2,3,4,0);
|
||||||
sql insert into t2 values(1648791215000,3,4,5,1.1);
|
sql insert into t2 values(1648791215000,3,4,5,0);
|
||||||
sql insert into t2 values(1648791217000,4,5,6,1.1);
|
sql insert into t2 values(1648791217000,4,5,6,0);
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue