set ts column index for function
This commit is contained in:
parent
26fb2ee2b8
commit
c6d492d3fd
|
@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState;
|
|||
typedef SList SStreamSnapshot;
|
||||
|
||||
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
|
||||
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
|
||||
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
|
||||
|
@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi
|
|||
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
|
||||
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
|
||||
|
||||
typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||
|
||||
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
||||
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||
|
@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
|
|||
|
||||
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
|
||||
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
|
||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
||||
|
||||
void* getRowStateBuff(SStreamFileState* pFileState);
|
||||
|
@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
|
||||
//function
|
||||
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -657,6 +657,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
||||
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
|
@ -675,8 +676,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
|
|
|
@ -709,6 +709,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = tsSlotId;
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
||||
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
|
@ -716,7 +717,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = tsSlotId;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
|
|
@ -1730,25 +1730,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
|
||||
int32_t pageSize = 4096;
|
||||
while (pageSize < pSup->resultRowSize * 4) {
|
||||
pageSize <<= 1u;
|
||||
}
|
||||
// at least four pages need to be in buffer
|
||||
int32_t bufSize = 4096 * 256;
|
||||
if (bufSize <= pageSize) {
|
||||
bufSize = pageSize * 4;
|
||||
}
|
||||
|
||||
if (!osTempSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
|
||||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
}
|
||||
|
||||
pSup->pSessionAPI = pApi;
|
||||
|
@ -2948,6 +2931,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
|
||||
};
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
|
||||
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
|
@ -2957,7 +2941,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
if (pSessionNode->window.pTsEnd) {
|
||||
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
||||
}
|
||||
|
@ -3843,14 +3826,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
}
|
||||
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
|
||||
int16_t type = pColNode->node.resType.type;
|
||||
pInfo->primaryTsIndex = tsSlotId;
|
||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
||||
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
||||
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = tsSlotId;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
|
|
@ -202,6 +202,13 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||
SWinKey* pTmpkey = pKey;
|
||||
ASSERT(keyLen == sizeof(SWinKey));
|
||||
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
|
||||
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen);
|
||||
}
|
||||
|
||||
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SSessionKey* pKey = pPos->pKey;
|
||||
|
|
|
@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) {
|
|||
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
void* pVal = NULL;
|
||||
int32_t len = 0;
|
||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
||||
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||
memcpy(buf + len - rowSize, value, vLen);
|
||||
return code;
|
||||
#else
|
||||
|
@ -290,10 +290,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
|||
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
||||
#ifdef USE_ROCKSDB
|
||||
void* pVal = NULL;
|
||||
int32_t len = 0;
|
||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
||||
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||
*ppVal = buf + len - rowSize;
|
||||
return code;
|
||||
#else
|
||||
|
|
|
@ -58,6 +58,8 @@ struct SStreamFileState {
|
|||
_state_file_remove_fn stateFileRemoveFn;
|
||||
_state_file_get_fn stateFileGetFn;
|
||||
_state_file_clear_fn stateFileClearFn;
|
||||
|
||||
_state_fun_get_fn stateFunctionGetFn;
|
||||
};
|
||||
|
||||
typedef SRowBuffPos SRowBuffInfo;
|
||||
|
@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->stateFileGetFn = intervalFileGetFn;
|
||||
pFileState->stateFileClearFn = streamStateClear_rocksdb;
|
||||
pFileState->cfName = taosStrdup("state");
|
||||
pFileState->stateFunctionGetFn = getRowBuff;
|
||||
} else {
|
||||
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
|
||||
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
|
||||
|
@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
|||
pFileState->stateFileGetFn = sessionFileGetFn;
|
||||
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
|
||||
pFileState->cfName = taosStrdup("sess");
|
||||
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||
}
|
||||
|
||||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
||||
|
@ -736,7 +740,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
||||
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
||||
|
||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
||||
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
|
||||
|
@ -754,3 +758,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
|
|||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
||||
|
||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
||||
|
||||
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||
return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue