diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 0a240dd8f5..49b5201382 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -38,6 +38,9 @@ extern "C" { #define META_READER_NOLOCK 0x1 +#define STREAM_STATE_BUFF_HASH 1 +#define STREAM_STATE_BUFF_SORT 2 + typedef struct SMeta SMeta; typedef TSKEY (*GetTsFun)(void*); @@ -115,6 +118,7 @@ typedef struct SRowBuffPos { void* pKey; bool beFlushed; bool beUsed; + bool needFree; } SRowBuffPos; // tq @@ -332,6 +336,8 @@ typedef struct { void* db; // rocksdb_t* db; void* pCur; int64_t number; + void* pStreamFileState; + int32_t buffIndex; } SStreamStateCur; typedef struct SStateStore { @@ -339,7 +345,8 @@ typedef struct SStateStore { int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal); int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); - int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal); + int32_t (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used); + int32_t (*streamStateClearBuff)(SStreamState* pState, void* pVal); void (*streamStateFreeVal)(void* val); int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); @@ -370,7 +377,7 @@ typedef struct SStateStore { int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); - int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); + int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); int32_t (*streamStateSessionClear)(SStreamState* pState); @@ -399,7 +406,7 @@ typedef struct SStateStore { struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, - const char* id, int64_t ckId); + const char* id, int64_t ckId, int8_t type); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 4312da6f2c..1445f4c26e 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -49,26 +49,30 @@ void streamStateSetNumber(SStreamState* pState, int32_t number); int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); +//session window int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); -int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); +int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionClear(SStreamState* pState); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, - state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key); +//state window +int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); + int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key); int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal); +int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used); +int32_t streamStateClearBuff(SStreamState* pState, void* pVal); void streamStateFreeVal(void* val); SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key); @@ -80,10 +84,6 @@ void streamStateFreeCur(SStreamStateCur* pCur); int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); -int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key); -int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); -int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); - int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); @@ -91,6 +91,7 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); void streamStateReloadInfo(SStreamState* pState, TSKEY ts); +SStreamStateCur* createStreamStateCursor(); /***compare func **/ diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 052231fe39..70199731e5 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -28,14 +28,25 @@ extern "C" { #endif typedef struct SStreamFileState SStreamFileState; -typedef SList SStreamSnapshot; +typedef SList SStreamSnapshot; + +typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); +typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); +typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); +typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); + +typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey); +typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); +typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, - int64_t checkpointId); + int64_t checkpointId, int8_t type); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); +void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used); +int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); @@ -52,6 +63,31 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); +void* getRowStateBuff(SStreamFileState* pFileState); +void* getStateFileStore(SStreamFileState* pFileState); +bool isDeteled(SStreamFileState* pFileState, TSKEY ts); +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts); +SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState); +int32_t getRowStateRowSize(SStreamFileState* pFileState); + +// session window +int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); +int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen); + +void sessionWinStateClear(SStreamFileState* pFileState); +void sessionWinStateCleanup(void* pBuff); + +SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey); +SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey); +SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey); +int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur); +int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey); + +// state window +int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); + #ifdef __cplusplus } #endif diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index e737e3fa37..389137f630 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -29,13 +29,12 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamFileStateInit = streamFileStateInit; pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF; - pStore->streamStateGetByPos = streamStateGetByPos; - pStore->streamStatePutParName = streamStatePutParName; pStore->streamStateGetParName = streamStateGetParName; pStore->streamStateAddIfNotExist = streamStateAddIfNotExist; pStore->streamStateReleaseBuf = streamStateReleaseBuf; + pStore->streamStateClearBuff = streamStateClearBuff; pStore->streamStateFreeVal = streamStateFreeVal; pStore->streamStatePut = streamStatePut; @@ -91,8 +90,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev; pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext; - pStore->streamFileStateInit = streamFileStateInit; - pStore->streamFileStateDestroy = streamFileStateDestroy; pStore->streamFileStateClear = streamFileStateClear; pStore->needClearDiskBuff = needClearDiskBuff; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index c72ecd4824..72e5e9ca88 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -137,13 +137,12 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamFileStateInit = streamFileStateInit; pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF; - pStore->streamStateGetByPos = streamStateGetByPos; - pStore->streamStatePutParName = streamStatePutParName; pStore->streamStateGetParName = streamStateGetParName; pStore->streamStateAddIfNotExist = streamStateAddIfNotExist; pStore->streamStateReleaseBuf = streamStateReleaseBuf; + pStore->streamStateClearBuff = streamStateClearBuff; pStore->streamStateFreeVal = streamStateFreeVal; pStore->streamStatePut = streamStatePut; @@ -199,8 +198,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev; pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext; - pStore->streamFileStateInit = streamFileStateInit; - pStore->streamFileStateDestroy = streamFileStateDestroy; pStore->streamFileStateClear = streamFileStateClear; pStore->needClearDiskBuff = needClearDiskBuff; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 8726f57977..d5d144ee65 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -548,9 +548,9 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SResultWindowInfo { - void* pOutputBuf; - SSessionKey sessionWin; - bool isOutput; + SRowBuffPos* pStatePos; + SSessionKey sessionWin; + bool isOutput; } SResultWindowInfo; typedef struct SStreamSessionAggOperatorInfo { @@ -579,6 +579,7 @@ typedef struct SStreamSessionAggOperatorInfo { bool isHistoryOp; bool reCkBlock; SSDataBlock* pCheckpointRes; + bool clearState; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -672,8 +673,6 @@ void cleanupAggSup(SAggSupporter* pAggSup); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); -void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf); void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); @@ -739,12 +738,6 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); bool groupbyTbname(SNodeList* pGroupList); -int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo); -int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI); -int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo); -int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 519a308c3a..8ad174f366 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -720,38 +720,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS return 0; } -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pbInfo->pRes; - - // set output datablock version - pBlock->info.version = pTaskInfo->version; - - blockDataCleanup(pBlock); - if (!hasRemainResults(pGroupResInfo)) { - return; - } - - // clear the existed group id - pBlock->info.id.groupId = 0; - ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, - false); - - void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < - 0) { - pBlock->info.parTbName[0] = 0; - } else { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } - - pAPI->stateStore.streamStateFreeVal(tbname); -} - void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -960,109 +928,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* return TSDB_CODE_SUCCESS; } -int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI) { - pAPI->streamStateReleaseBuf(pState, pKey, pResult); - return TSDB_CODE_SUCCESS; -} - -int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI) { - pAPI->streamStateSessionPut(pState, key, (const void*)buf, size); - releaseOutputBuf(pState, NULL, (SResultRow*)buf, pAPI); - return TSDB_CODE_SUCCESS; -} - -int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SExprInfo* pExprInfo = pSup->pExprInfo; - int32_t numOfExprs = pSup->numOfExprs; - int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; - SqlFunctionCtx* pCtx = pSup->pCtx; - - int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); - - for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { - SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i); - int32_t size = 0; - void* pVal = NULL; - int32_t code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size); - // ASSERT(code == 0); - if (code == -1) { - // for history - qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "", - pKey->win.skey, pKey->win.ekey, pKey->groupId); - pGroupResInfo->index += 1; - continue; - } - SResultRow* pRow = (SResultRow*)pVal; - doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); - // no results, continue to check the next one - if (pRow->numOfRows == 0) { - pGroupResInfo->index += 1; - releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore); - continue; - } - - if (pBlock->info.id.groupId == 0) { - pBlock->info.id.groupId = pKey->groupId; - - void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, - &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } - pAPI->stateStore.streamStateFreeVal(tbname); - } else { - // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.id.groupId != pKey->groupId) { - releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore); - break; - } - } - - if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - ASSERT(pBlock->info.rows > 0); - releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore); - break; - } - - pGroupResInfo->index += 1; - - for (int32_t j = 0; j < numOfExprs; ++j) { - int32_t slotId = pExprInfo[j].base.resSchema.slotId; - - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - if (pCtx[j].fpSet.finalize) { - int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code1)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); - T_LONG_JMP(pTaskInfo->env, code1); - } - } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor - } else { - // expand the result into multiple rows. E.g., _wstart, top(k, 20) - // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } - } - } - - pBlock->info.dataLoad = 1; - pBlock->info.rows += pRow->numOfRows; - releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore); - } - blockDataUpdateTsWindow(pBlock, 0); - return TSDB_CODE_SUCCESS; -} - void streamOpReleaseState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1060dd4f0e..f6b0a87f54 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1057,7 +1057,7 @@ void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup } else { memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN); } - pAPI->streamStateReleaseBuf(pState, NULL, pValue); + pAPI->streamStateFreeVal(pValue); } static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 1c909cb47d..12e22f2597 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -152,8 +152,7 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { } static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { - tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); - return TSDB_CODE_SUCCESS; + return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); } static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { @@ -696,7 +695,6 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat int32_t slotId = pExprInfo[j].base.resSchema.slotId; pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; if (pCtx[j].fpSet.finalize) { int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); @@ -931,17 +929,6 @@ void* decodeSWinKey(void* buf, SWinKey* key) { return buf; } -int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) { - int32_t tlen = 0; - tlen += encodeSWinKey(buf, pos->pKey); - return tlen; -} - -void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) { - buf = decodeSWinKey(buf, pos->pKey); - return buf; -} - int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs); @@ -1155,6 +1142,18 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return NULL; } +static int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosArraySort(pUpdated, compar); + tSimpleHashCleanup(*ppWinUpdated); + *ppWinUpdated = NULL; + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1311,15 +1310,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; - void* pIte = NULL; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { - taosArrayPush(pInfo->pUpdated, pIte); - } - - tSimpleHashCleanup(pInfo->pUpdatedMap); - pInfo->pUpdatedMap = NULL; - taosArraySort(pInfo->pUpdated, winPosCmprImpl); + copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; @@ -1480,7 +1471,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -1607,10 +1598,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->twAggSup = *pTwSup; } -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, - SReadHandle* pHandle, SStorageAPI* pApi) { - pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); + SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, + SStorageAPI* pApi) { + pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; pSup->stateKeySize = keySize; @@ -1622,10 +1614,14 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, pSup->stateStore = *pStore; - initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput); + initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1); + int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); + pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState, + pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); @@ -1648,7 +1644,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].saveHandle.pBuf = pSup->pResultBuf; + pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf; } pSup->pSessionAPI = pApi; @@ -1687,33 +1683,40 @@ bool inWinRange(STimeWindow* range, STimeWindow* cur) { return false; } +int32_t clearOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + return pAPI->streamStateClearBuff(pState, pPos); +} + void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SResultWindowInfo* pCurWin) { pCurWin->sessionWin.groupId = groupId; pCurWin->sessionWin.win.skey = startTs; pCurWin->sessionWin.win.ekey = endTs; int32_t size = pAggSup->resultRowSize; - int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, - pAggSup->gap, &pCurWin->pOutputBuf, &size); + int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, + pAggSup->gap, (void**)&pCurWin->pStatePos, &size); if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) { code = TSDB_CODE_FAILED; - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore); - pCurWin->pOutputBuf = taosMemoryCalloc(1, size); + clearOutputBuf(pAggSup->pState, pCurWin->pStatePos, &pAggSup->pSessionAPI->stateStore); } if (code == TSDB_CODE_SUCCESS) { pCurWin->isOutput = true; - pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin); + if (pCurWin->pStatePos->needFree) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin); + } } else { pCurWin->sessionWin.win.skey = startTs; pCurWin->sessionWin.win.ekey = endTs; } + qDebug("===stream===set session window buff .start:%" PRId64 ",end:%" PRId64 ",groupid:%" PRIu64, + pCurWin->sessionWin.win.skey, pCurWin->sessionWin.win.ekey, pCurWin->sessionWin.groupId); } int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t size = 0; - int32_t code = - pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); + int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, + (void**)&pWinInfo->pStatePos, &size); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1774,6 +1777,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS if (pEndTs) { pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]); } + memcpy(pWinInfo->pStatePos->pKey, &pWinInfo->sessionWin, sizeof(SSessionKey)); } return rows - start; } @@ -1781,7 +1785,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey); - *pResult = (SResultRow*)pWinInfo->pOutputBuf; + *pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff; // set time window for current result (*pResult)->win = pWinInfo->sessionWin.win; setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); @@ -1819,20 +1823,24 @@ static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* return TSDB_CODE_SUCCESS; } -SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, +int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, false); + return TSDB_CODE_SUCCESS; +} + +void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin) { SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin); pNextWin->isOutput = true; setSessionWinOutputInfo(pStUpdated, pNextWin); int32_t size = 0; pNextWin->sessionWin = pCurWin->sessionWin; - int32_t code = - pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); + int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, + (void**)&pNextWin->pStatePos, &size); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pNextWin->pOutputBuf); SET_SESSION_WIN_INVALID(*pNextWin); } - return pCur; + pAggSup->stateStore.streamStateFreeCur(pCur); } static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, @@ -1850,16 +1858,16 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* // Just look for the window behind StartIndex while (1) { SResultWindowInfo winInfo = {0}; - SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); + getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { - taosMemoryFree(winInfo.pOutputBuf); - pAPI->stateStore.streamStateFreeCur(pCur); + releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); break; } SResultRow* pWinResult = NULL; initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); + memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); int64_t winDelta = 0; if (addGap) { winDelta = pAggSup->gap; @@ -1872,8 +1880,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* } removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); - pAPI->stateStore.streamStateFreeCur(pCur); - taosMemoryFree(winInfo.pOutputBuf); + releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); winNum++; } return winNum; @@ -1890,25 +1897,21 @@ static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo // Just look for the window behind StartIndex while (1) { SResultWindowInfo winInfo = {0}; - SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo); + getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo); if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { - taosMemoryFree(winInfo.pOutputBuf); - pAPI->stateStore.streamStateFreeCur(pCur); + releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); break; } pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); + memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); - pAPI->stateStore.streamStateFreeCur(pCur); - taosMemoryFree(winInfo.pOutputBuf); + releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); } } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { - saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, - &pAggSup->stateStore); - pWinInfo->pOutputBuf = NULL; - return TSDB_CODE_SUCCESS; + return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos, pAggSup->resultRowSize); } static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated, @@ -1946,13 +1949,13 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } SResultWindowInfo winInfo = {0}; setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo); + // coverity scan error + if (!winInfo.pStatePos) { + continue; + } setSessionWinOutputInfo(pStUpdated, &winInfo); winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, pAggSup->pResultRows, pStUpdated, pStDeleted); - // coverity scan error - if (!winInfo.pOutputBuf) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); - } int64_t winDelta = 0; if (addGap) { @@ -2023,17 +2026,6 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) return 0; } -static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) { - void* pIte = NULL; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, NULL); - taosArrayPush(pUpdated, key); - } - taosArraySort(pUpdated, sessionKeyCompareAsc); - return TSDB_CODE_SUCCESS; -} - void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; @@ -2111,16 +2103,18 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); continue; } if (code == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) { if (num == 0) { setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); + parentWin.sessionWin = childWin.sessionWin; + memcpy(parentWin.pStatePos->pKey, &parentWin.sessionWin, sizeof(SSessionKey)); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); break; } } @@ -2131,9 +2125,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); saveResult(parentWin, pStUpdated); - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); } else { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore); break; } } @@ -2203,6 +2197,97 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->freeItem = false; } +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, + SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExprInfo* pExprInfo = pSup->pExprInfo; + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { + SRowBuffPos* pPos = *(SRowBuffPos**) taosArrayGet(pGroupResInfo->pRows, i); + SResultRow* pRow = NULL; + int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); + SSessionKey* pKey = (SSessionKey*) pPos->pKey; + + if (code == -1) { + // for history + qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "", + pKey->win.skey, pKey->win.ekey, pKey->groupId); + pGroupResInfo->index += 1; + continue; + } + + doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); + // no results, continue to check the next one + if (pRow->numOfRows == 0) { + pGroupResInfo->index += 1; + releaseOutputBuf(pState, pPos, &pAPI->stateStore); + continue; + } + + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pKey->groupId; + + void* tbname = NULL; + if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, + &tbname) < 0) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } + pAPI->stateStore.streamStateFreeVal(tbname); + } else { + // current value belongs to different group, it can't be packed into one datablock + if (pBlock->info.id.groupId != pKey->groupId) { + releaseOutputBuf(pState, pPos, &pAPI->stateStore); + break; + } + } + + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + ASSERT(pBlock->info.rows > 0); + releaseOutputBuf(pState, pPos, &pAPI->stateStore); + break; + } + + pGroupResInfo->index += 1; + + for (int32_t j = 0; j < numOfExprs; ++j) { + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); + if (pCtx[j].fpSet.finalize) { + int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code1)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); + T_LONG_JMP(pTaskInfo->env, code1); + } + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor + } else { + // expand the result into multiple rows. E.g., _wstart, top(k, 20) + // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } + } + } + + pBlock->info.dataLoad = 1; + pBlock->info.rows += pRow->numOfRows; + releaseOutputBuf(pState, pPos, &pAPI->stateStore); + } + blockDataUpdateTsWindow(pBlock, 0); + return TSDB_CODE_SUCCESS; +} + void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // set output datablock version @@ -2232,6 +2317,7 @@ static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } + doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); @@ -2283,7 +2369,7 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) { buf = taosDecodeFixedBool(buf, &key->isOutput); - key->pOutputBuf = NULL; + key->pStatePos->pRowBuff = NULL; buf = decodeSSessionKey(buf, &key->sessionWin); return buf; } @@ -2488,10 +2574,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); - copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated); + copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); - tSimpleHashCleanup(pInfo->pStUpdated); - pInfo->pStUpdated = NULL; if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -2619,18 +2703,18 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } } - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pExpSup = &pOperator->exprSupp; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); + code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pTaskInfo->storageAPI); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2670,6 +2754,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + pInfo->clearState = false; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; // for stream void* buff = NULL; @@ -2727,11 +2812,18 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return opRes; } + if (pInfo->clearState) { + clearFunctionContext(&pOperator->exprSupp); + // semi session operator clear disk buffer + clearStreamSessionOperator(pInfo); + } + if (pOperator->status == OP_RES_TO_RETURN) { clearFunctionContext(&pOperator->exprSupp); - // semi interval operator clear disk buffer + // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); setOperatorCompleted(pOperator); + pInfo->clearState = false; return NULL; } } @@ -2760,6 +2852,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStUpdated, pWins); copyDeleteWindowInfo(pWins, pInfo->pStDeleted); taosArrayDestroy(pWins); + pInfo->clearState = true; break; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated); @@ -2787,10 +2880,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs; - copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated); + copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); - tSimpleHashCleanup(pInfo->pStUpdated); - pInfo->pStUpdated = NULL; if(pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); @@ -2806,7 +2897,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } clearFunctionContext(&pOperator->exprSupp); - // semi interval operator clear disk buffer + // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); setOperatorCompleted(pOperator); return NULL; @@ -2836,7 +2927,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); for (int32_t i = 0; i < numOfChild; i++) { - SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, NULL); + SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle); if (pChildOp == NULL) { goto _error; } @@ -2924,9 +3015,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pCurWin->winInfo.sessionWin.win.ekey = ts; int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize, compareStateKey, - &pCurWin->winInfo.pOutputBuf, &size); + (void**)&pCurWin->winInfo.pStatePos, &size); pCurWin->pStateKey = - (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pCurWin->pStateKey->type = pAggSup->stateKeyType; pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); @@ -2934,11 +3025,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, - &pAggSup->pSessionAPI->stateStore); - pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); + clearOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); pCurWin->pStateKey = - (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + (SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pCurWin->pStateKey->type = pAggSup->stateKeyType; pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); @@ -2952,7 +3041,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS) { pCurWin->winInfo.isOutput = true; - pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); + if (pCurWin->winInfo.pStatePos->needFree) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); + } } else if (pKeyData) { if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) { varDataCopy(pCurWin->pStateKey->pData, pKeyData); @@ -2966,12 +3057,12 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); int32_t nextSize = pAggSup->resultRowSize; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, - &pNextWin->winInfo.pOutputBuf, &nextSize); + (void**)&pNextWin->winInfo.pStatePos, &nextSize); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); } else { pNextWin->pStateKey = - (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + (SStateKeys*)((char*)pNextWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pNextWin->pStateKey->type = pAggSup->stateKeyType; pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); @@ -3008,6 +3099,7 @@ int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNex pWinInfo->winInfo.sessionWin.win.skey = pTs[i]; } pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]); + memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); if (!isEqualStateKey(pWinInfo, pKeyData)) { *allEqual = false; } @@ -3057,7 +3149,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStateWindowInfo nextWin = {0}; setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); if (IS_VALID_SESSION_WIN(nextWin.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore); + releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, @@ -3068,7 +3160,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin); - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf, &pAPI->stateStore); + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore); continue; } code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, @@ -3109,7 +3201,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); + void* key = tSimpleHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } @@ -3285,10 +3377,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); - copyUpdateResult(pInfo->pSeUpdated, pInfo->pUpdated); + copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc); removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated); - tSimpleHashCleanup(pInfo->pSeUpdated); - pInfo->pSeUpdated = NULL; if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); @@ -3333,6 +3423,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur SResultRow* pWinResult = NULL; initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey); + memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); @@ -3344,7 +3435,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur } removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); - taosMemoryFree(pNextWin->pOutputBuf); + releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore); } void streamStateReloadState(SOperatorInfo* pOperator) { @@ -3394,8 +3485,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); } } else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, - &pAggSup->pSessionAPI->stateStore); + releaseOutputBuf(pAggSup->pState, nextInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { @@ -3444,18 +3534,19 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); + code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; - code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, - type, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, + type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3734,7 +3825,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 19f4ebbeea..01fbdcf193 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2006,7 +2006,7 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateSeekKeyNext_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2062,7 +2062,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK qDebug("seek to last:%s", tbuf); } - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) return NULL; pCur->number = pState->number; @@ -2089,7 +2089,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* qDebug("streamStateGetCur_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) return NULL; pCur->db = wrapper->rocksdb; @@ -2178,7 +2178,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2218,7 +2218,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2256,7 +2256,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2357,7 +2357,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (pCur == NULL) return NULL; @@ -2418,7 +2418,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (!pCur) { return NULL; } @@ -2456,7 +2456,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2494,7 +2494,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return -1; } @@ -2764,7 +2764,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->db = wrapper->rocksdb; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c new file mode 100644 index 0000000000..90a20fc0df --- /dev/null +++ b/source/libs/stream/src/streamSessionState.c @@ -0,0 +1,470 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstreamFileState.h" + +#include "query.h" +#include "streamBackendRocksdb.h" +#include "taos.h" +#include "tcommon.h" +#include "thash.h" +#include "tsimplehash.h" + + +typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos); + +int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) { + SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos); + SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey; + return sessionWinKeyCmpr(pWin1, pWin2); +} + +int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) { + int firstPos = 0, lastPos = num - 1, midPos = -1; + int numOfRows = 0; + + if (num <= 0) return -1; + // find the first position which is smaller or equal than the key. + // if all data is bigger than the key return -1 + while (1) { + if (cmpFn(key, keyList, lastPos) >= 0) return lastPos; + if (cmpFn(key, keyList, firstPos) == 0) return firstPos; + if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1; + + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; + + if (cmpFn(key, keyList, midPos) < 0) { + lastPos = midPos - 1; + } else if (cmpFn(key, keyList, midPos) > 0) { + firstPos = midPos + 1; + } else { + break; + } + } + + return midPos; +} + +int64_t getSessionWindowEndkey(void* data, int32_t index) { + SArray* pWinInfos = (SArray*)data; + SRowBuffPos** ppos = taosArrayGet(pWinInfos, index); + SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); + return pWin->win.ekey; +} + +bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { + if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) { + return true; + } + return false; +} + +static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + ASSERT(pNewPos->pRowBuff); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); + taosArrayPush(pWinInfos, &pNewPos); + return pNewPos; +} + +static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey, int32_t index) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + ASSERT(pNewPos->pRowBuff); + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); + taosArrayInsert(pWinInfos, index, &pNewPos); + return pNewPos; +} + +int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { + SSessionKey* pWinKey = key; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, POINTER_BYTES); + tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + } + + TSKEY startTs = pWinKey->win.skey; + TSKEY endTs = pWinKey->win.ekey; + + int32_t size = taosArrayGetSize(pWinStates); + if (size == 0) { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + goto _end; + } + + // find the first position which is smaller than the pWinKey + int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + SRowBuffPos* pPos = NULL; + + if (index >= 0) { + pPos = taosArrayGetP(pWinStates, index); + if (inSessionWindow(pPos->pKey, startTs, gap)) { + (*pVal) = pPos; + SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + *key = *pDestWinKey; + goto _end; + } + } + + if (index + 1 < size) { + pPos = taosArrayGetP(pWinStates, index + 1); + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) { + (*pVal) = pPos; + SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + *key = *pDestWinKey; + goto _end; + } + } + + if (index + 1 == 0) { + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { + int32_t len = 0; + void* p = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, &len); + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + pNewPos->needFree = true; + + qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); + if (code == TSDB_CODE_SUCCESS) { + memcpy(pNewPos->pRowBuff, p, len); + } + taosMemoryFree(p); + (*pVal) = pNewPos; + goto _end; + } + } + + if (index == size - 1) { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + goto _end; + } + (*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1); + +_end: + return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; +} + +int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) { + SSHashObj* pSessionBuff = (SSHashObj*) pBuff; + SSessionKey* pWinKey = (SSessionKey*) key; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + if (!ppBuff) { + return TSDB_CODE_SUCCESS; + } + SArray* pWinStates = (SArray*)(*ppBuff); + int32_t size = taosArrayGetSize(pWinStates); + TSKEY gap = 0; + int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + if (index >= 0) { + SRowBuffPos** ppos = taosArrayGet(pWinStates, index); + if (inSessionWindow((*ppos)->pKey, pWinKey->win.skey, gap)) { + taosArrayRemove(pWinStates, index); + } + } + return TSDB_CODE_SUCCESS; +} + +void sessionWinStateClear(SStreamFileState* pFileState) { + int32_t buffSize = getRowStateRowSize(pFileState); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + void* pBuff = getRowStateBuff(pFileState); + while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { + SArray* pWinStates = *((void**)pIte); + int32_t size = taosArrayGetSize(pWinStates); + for (int32_t i = 0; i < size; i++) { + SRowBuffPos* pPos = taosArrayGetP(pWinStates, i); + memset(pPos->pRowBuff, 0, buffSize); + } + } +} + +void sessionWinStateCleanup(void* pBuff) { + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { + SArray* pWinStates = (SArray*) pIte; + taosArrayDestroy(pWinStates); + } + tSimpleHashCleanup(pBuff); +} + +static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey, + SArray** pWins, int32_t* pIndex) { + SStreamStateCur* pCur = NULL; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + if (!ppBuff) { + return NULL; + } + + SArray* pWinStates = (SArray*)(*ppBuff); + int32_t size = taosArrayGetSize(pWinStates); + TSKEY gap = 0; + int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + + if (pWins) { + (*pWins) = pWinStates; + } + + if (index >= 0) { + pCur = createStreamStateCursor(); + pCur->buffIndex = index; + pCur->pStreamFileState = pFileState; + if (pIndex) { + *pIndex = index; + } + } + return pCur; +} + +SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) { + SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL); + if (pCur) { + return pCur; + } + + void* pFileStore = getStateFileStore(pFileState); + pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey); + if (!pCur) { + return NULL; + } + pCur->buffIndex = -1; + pCur->pStreamFileState = pFileState; + return pCur; +} +static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCur) { + SStreamStateCur* pCur = *ppCur; + streamStateFreeCur(pCur); + pCur = createStreamStateCursor(); + (*ppCur) = pCur; + pCur->buffIndex = 0; + pCur->pStreamFileState = pFileState; +} + +static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { + SSessionKey key = {.groupId = groupId}; + int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); + if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0) { + transformCursor(pFileState, ppCur); + } else { + SStreamStateCur* pCur = *ppCur; + pCur->buffIndex = -1; + pCur->pStreamFileState = pFileState; + } +} + +SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { + SArray* pWinStates = NULL; + int32_t index = -1; + SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); + if (pCur) { + if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) { + sessionWinStateMoveToNext(pCur); + } + return pCur; + } + + void* pFileStore = getStateFileStore(pFileState); + pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey); + checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur); + return pCur; +} + +SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { + SArray* pWinStates = NULL; + int32_t index = -1; + SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); + if (pCur) { + sessionWinStateMoveToNext(pCur); + return pCur; + } + + void* pFileStore = getStateFileStore(pFileState); + pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey); + checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur); + return pCur; +} + +int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + if (!pCur) { + return TSDB_CODE_FAILED; + } + + SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + if (!ppBuff) { + return TSDB_CODE_FAILED; + } + + SArray* pWinStates = (SArray*)(*ppBuff); + int32_t size = taosArrayGetSize(pWinStates); + if (pCur->buffIndex >= 0) { + if (pCur->buffIndex >= size) { + return TSDB_CODE_FAILED; + } + SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); + if (pVal) { + *pVal = pPos; + } + *pKey = *(SSessionKey*)(pPos->pKey); + } else { + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0) { + transformCursor(pCur->pStreamFileState, &pCur); + if (pCur->buffIndex >= size) { + return TSDB_CODE_FAILED; + } + SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); + if (pVal) { + *pVal = pPos; + } + *pKey = *(SSessionKey*)(pPos->pKey); + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { + if (pCur && pCur->buffIndex >= 0) { + pCur->buffIndex++; + } else { + streamStateCurNext_rocksdb(NULL, pCur); + } + return TSDB_CODE_SUCCESS; +} + +int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) { + SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key); + SSessionKey tmpKey = *key; + int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); + bool hasCurrentPrev = true; + if (code == TSDB_CODE_FAILED) { + pCur = sessionWinStateSeekKeyNext(pFileState, key); + code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); + hasCurrentPrev = false; + } + if (code == TSDB_CODE_FAILED) { + return TSDB_CODE_FAILED; + } + + if (sessionRangeKeyCmpr(key, &tmpKey) == 0) { + *curKey = tmpKey; + return code; + } else if (!hasCurrentPrev) { + return TSDB_CODE_FAILED; + } + + sessionWinStateMoveToNext(pCur); + code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); + if (code == TSDB_CODE_SUCCESS && sessionRangeKeyCmpr(key, &tmpKey) == 0) { + *curKey = tmpKey; + } else { + code = TSDB_CODE_FAILED; + } + + return code; +} + +int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { + SSessionKey* pWinKey = key; + TSKEY gap = 0; + int32_t code = TSDB_CODE_SUCCESS; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, POINTER_BYTES); + tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + } + + TSKEY startTs = pWinKey->win.skey; + TSKEY endTs = pWinKey->win.ekey; + + int32_t size = taosArrayGetSize(pWinStates); + if (size == 0) { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + code = TSDB_CODE_FAILED; + goto _end; + } + + // find the first position which is smaller than the pWinKey + int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); + SRowBuffPos* pPos = NULL; + int32_t valSize = *pVLen; + + if (index >= 0) { + pPos = taosArrayGetP(pWinStates, index); + void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen); + if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) { + (*pVal) = pPos; + SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + *key = *pDestWinKey; + goto _end; + } + } + + if (index + 1 < size) { + pPos = taosArrayGetP(pWinStates, index + 1); + void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen); + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) { + (*pVal) = pPos; + SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; + *key = *pDestWinKey; + goto _end; + } + } + + if (index + 1 == 0) { + if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) { + int32_t len = 0; + void* p = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, &len); + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + pNewPos->needFree = true; + + qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); + if (code == TSDB_CODE_SUCCESS) { + memcpy(pNewPos->pRowBuff, p, len); + } + taosMemoryFree(p); + (*pVal) = pNewPos; + goto _end; + } + } + + if (index == size - 1) { + (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); + code = TSDB_CODE_FAILED; + goto _end; + } + (*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1); + code = TSDB_CODE_FAILED; + +_end: + return code; +} diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 44c7b4f2e0..3d924ca73c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -395,9 +395,6 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateClear(pState->pFileState); - if (needClearDiskBuff(pState->pFileState)) { - streamStateClear_rocksdb(pState); - } return 0; #else SWinKey key = {.ts = 0, .groupId = 0}; @@ -466,20 +463,24 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* #endif } -int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { +int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) { // todo refactor qDebug("streamStateReleaseBuf"); if (!pVal) { return 0; } #ifdef USE_ROCKSDB - taosMemoryFree(pVal); + streamFileStateReleaseBuff(pState->pFileState, pVal, used); #else streamStateFreeVal(pVal); #endif return 0; } +int32_t streamStateClearBuff(SStreamState* pState, void* pVal) { + return streamFileStateClearBuff(pState->pFileState, pVal); +} + SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB return streamStateFillGetCur_rocksdb(pState, key); @@ -569,39 +570,6 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v #endif } -int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { -#ifdef USE_ROCKSDB - return streamStateGetFirst_rocksdb(pState, key); -#else - // todo refactor - SWinKey tmp = {.ts = 0, .groupId = 0}; - streamStatePut(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp); - int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); - streamStateFreeCur(pCur); - streamStateDel(pState, &tmp); - return code; -#endif -} - -int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { -#ifdef USE_ROCKSDB - rocksdb_iter_seek_to_first(pCur->iter); - return 0; -#else - return tdbTbcMoveToFirst(pCur->pCur); -#endif -} - -int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { -#ifdef USE_ROCKSDB - rocksdb_iter_seek_to_last(pCur->iter); - return 0; -#else - return tdbTbcMoveToLast(pCur->pCur); -#endif -} - SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB return streamStateSeekKeyNext_rocksdb(pState, key); @@ -693,7 +661,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB - return streamStateCurNext_rocksdb(pState, pCur); + return sessionWinStateMoveToNext(pCur); #else if (!pCur) { return -1; @@ -714,7 +682,7 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #endif } void streamStateFreeCur(SStreamStateCur* pCur) { - if (!pCur) { + if (!pCur || pCur->buffIndex >= 0) { return; } qDebug("streamStateFreeCur"); @@ -734,11 +702,17 @@ void streamStateFreeVal(void* val) { #endif } -int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { +int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { #ifdef USE_ROCKSDB - qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, - key->groupId); - return streamStateSessionPut_rocksdb(pState, key, value, vLen); + SRowBuffPos* pos = (SRowBuffPos*)value; + if (pos->needFree) { + int32_t code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); + streamStateReleaseBuf(pState, pos, true); + qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, + key->win.ekey, key->groupId, code); + return code; + } + return TSDB_CODE_SUCCESS; #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, @@ -748,6 +722,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB + ASSERT(0); return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen); #else @@ -773,7 +748,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, key->groupId); - return streamStateSessionDel_rocksdb(pState, key); + return deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey)); #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn); @@ -782,7 +757,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB - return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); + return sessionWinStateSeekKeyCurrentPrev(pState->pFileState, key); #else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -813,7 +788,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB - return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key); + return sessionWinStateSeekKeyCurrentNext(pState->pFileState, key); #else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -845,7 +820,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) { #ifdef USE_ROCKSDB - return streamStateSessionSeekKeyNext_rocksdb(pState, key); + return sessionWinStateSeekKeyNext(pState->pFileState, key); #else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -876,7 +851,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + return sessionWinStateGetKVByCur(pCur, pKey, pVal, pVLen); #else if (!pCur) { return -1; @@ -899,6 +874,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v int32_t streamStateSessionClear(SStreamState* pState) { #ifdef USE_ROCKSDB + sessionWinStateClear(pState->pFileState); return streamStateSessionClear_rocksdb(pState); #else SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; @@ -923,7 +899,7 @@ int32_t streamStateSessionClear(SStreamState* pState) { int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { #ifdef USE_ROCKSDB - return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey); + return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey); #else SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -976,7 +952,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen); + return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen); #else // todo refactor int32_t res = 0; @@ -1032,7 +1008,7 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch // todo refactor #ifdef USE_ROCKSDB - return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen); + return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen); #else int32_t res = 0; SSessionKey tmpKey = *key; @@ -1143,6 +1119,12 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } +SStreamStateCur* createStreamStateCursor() { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + pCur->buffIndex = -1; + return pCur; +} + #if 0 char* streamStateSessionDump(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index be3ad73472..b1318896e0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -16,7 +16,6 @@ #include "tstreamFileState.h" #include "query.h" -#include "storageapi.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" @@ -29,29 +28,64 @@ #define MIN_NUM_OF_ROW_BUFF 10240 struct SStreamFileState { - SList* usedBuffs; - SList* freeBuffs; - SSHashObj* rowBuffMap; - void* pFileStore; - int32_t rowSize; - int32_t selectivityRowSize; - int32_t keyLen; - uint64_t preCheckPointVersion; - uint64_t checkPointVersion; - TSKEY maxTs; - TSKEY deleteMark; - TSKEY flushMark; - uint64_t maxRowCount; - uint64_t curRowCount; - GetTsFun getTs; - char* id; + SList* usedBuffs; + SList* freeBuffs; + void* rowStateBuff; + void* pFileStore; + int32_t rowSize; + int32_t selectivityRowSize; + int32_t keyLen; + uint64_t preCheckPointVersion; + uint64_t checkPointVersion; + TSKEY maxTs; + TSKEY deleteMark; + TSKEY flushMark; + uint64_t maxRowCount; + uint64_t curRowCount; + GetTsFun getTs; + char* id; + + _state_buff_cleanup_fn stateBuffCleanupFn; + _state_buff_remove_fn stateBuffRemoveFn; + + _state_file_remove_fn stateFileRemoveFn; + _state_file_get_fn stateFileGetFn; + _state_file_clear_fn stateFileClearFn; }; typedef SRowBuffPos SRowBuffInfo; +int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { + return tSimpleHashRemove(pBuff, pKey, keyLen); +} + +void stateHashBuffClearFn(void* pBuff) { + tSimpleHashClear(pBuff); +} + +void stateHashBuffCleanupFn(void* pBuff) { + tSimpleHashCleanup(pBuff); +} + +int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { + return streamStateDel_rocksdb(pFileState->pFileStore, pKey); +} + +int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) { + return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); +} + +int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { + return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey); +} + +int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) { + return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); +} + SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, - int64_t checkpointId) { + int64_t checkpointId, int8_t type) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -69,8 +103,25 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->freeBuffs = tdListNew(POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); - pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn); - if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { + if (type == STREAM_STATE_BUFF_HASH) { + pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); + pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; + pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; + + pFileState->stateFileRemoveFn = intervalFileRemoveFn; + pFileState->stateFileGetFn = intervalFileGetFn; + pFileState->stateFileClearFn = streamStateClear_rocksdb; + } else { + pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); + pFileState->stateBuffCleanupFn = sessionWinStateCleanup; + pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff; + + pFileState->stateFileRemoveFn = sessionFileRemoveFn; + pFileState->stateFileGetFn = sessionFileGetFn; + pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; + } + + if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { goto _error; } @@ -134,10 +185,17 @@ void streamFileStateDestroy(SStreamFileState* pFileState) { taosMemoryFree(pFileState->id); tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr); tdListFreeP(pFileState->freeBuffs, destroyRowBuff); - tSimpleHashCleanup(pFileState->rowBuffMap); + pFileState->stateBuffCleanupFn(pFileState->rowStateBuff); taosMemoryFree(pFileState); } +void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + if (pPos->pRowBuff) { + tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); + pPos->pRowBuff = NULL; + } +} + void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -146,11 +204,10 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { while ((pNode = tdListNext(&iter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) { - ASSERT(pPos->pRowBuff != NULL); - tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); - pPos->pRowBuff = NULL; + putFreeBuff(pFileState, pPos); + if (!all) { - tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); } destroyRowBuffPos(pPos); tdListPopNode(pFileState->usedBuffs, pNode); @@ -162,12 +219,19 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { void streamFileStateClear(SStreamFileState* pFileState) { pFileState->flushMark = INT64_MIN; pFileState->maxTs = INT64_MIN; - tSimpleHashClear(pFileState->rowBuffMap); + tSimpleHashClear(pFileState->rowStateBuff); clearExpiredRowBuff(pFileState, 0, true); } bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } +void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { + if (pPos->needFree) { + putFreeBuff(pFileState, pPos); + } + pPos->beUsed = used; +} + void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; SListIter iter = {0}; @@ -179,10 +243,12 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin if (pPos->beUsed == used) { tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); - tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen); + pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); tdListPopNode(pFileState->usedBuffs, pNode); taosMemoryFreeClear(pNode); - i++; + if (pPos->pRowBuff) { + i++; + } } } @@ -210,9 +276,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { SListNode* pNode = NULL; while ((pNode = tdListNext(&fIter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; - ASSERT(pPos->pRowBuff != NULL); - tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff); - pPos->pRowBuff = NULL; + putFreeBuff(pFileState, pPos); } tdListFreeP(pFlushList, destroyRowBuffPosPtr); @@ -227,7 +291,9 @@ int32_t clearRowBuff(SStreamFileState* pFileState) { return TSDB_CODE_SUCCESS; } -void* getFreeBuff(SList* lists, int32_t buffSize) { +void* getFreeBuff(SStreamFileState* pFileState) { + SList* lists = pFileState->freeBuffs; + int32_t buffSize = pFileState->rowSize; SListNode* pNode = tdListPopHead(lists); if (!pNode) { return NULL; @@ -238,10 +304,18 @@ void* getFreeBuff(SList* lists, int32_t buffSize) { return ptr; } +int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + if (pPos->pRowBuff) { + memset(pPos->pRowBuff, 0, pFileState->rowSize); + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; +} + SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen); - void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + void* pBuff = getFreeBuff(pFileState); if (pBuff) { pPos->pRowBuff = pBuff; goto _end; @@ -258,7 +332,7 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { int32_t code = clearRowBuff(pFileState); ASSERT(code == 0); - pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + pPos->pRowBuff = getFreeBuff(pFileState); _end: tdListAppend(pFileState->usedBuffs, &pPos); @@ -266,9 +340,17 @@ _end: return pPos; } +SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { + SRowBuffPos* newPos = getNewRowPos(pFileState); + newPos->beUsed = true; + newPos->beFlushed = false; + newPos->needFree = false; + return newPos; +} + int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); - SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); + SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); if (pos) { *pVLen = pFileState->rowSize; *pVal = *pos; @@ -276,14 +358,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi (*pos)->beFlushed = false; return TSDB_CODE_SUCCESS; } - SRowBuffPos* pNewPos = getNewRowPos(pFileState); - pNewPos->beUsed = true; - pNewPos->beFlushed = false; + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, keyLen); TSKEY ts = pFileState->getTs(pKey); - if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) { + if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts)) { int32_t len = 0; void* p = NULL; int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len); @@ -294,7 +374,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi taosMemoryFree(p); } - tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES); + tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES); if (pVal) { *pVLen = pFileState->rowSize; *pVal = pNewPos; @@ -303,9 +383,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi } int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = tSimpleHashRemove(pFileState->rowBuffMap, pKey, keyLen); - int32_t code_rocks = streamStateDel_rocksdb(pFileState->pFileStore, pKey); - return code_buff == TSDB_CODE_SUCCESS ? code_buff : code_rocks; + int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); + int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); + if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; } int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { @@ -314,17 +397,17 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** return TSDB_CODE_SUCCESS; } - pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + pPos->pRowBuff = getFreeBuff(pFileState); if (!pPos->pRowBuff) { int32_t code = clearRowBuff(pFileState); ASSERT(code == 0); - pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize); + pPos->pRowBuff = getFreeBuff(pFileState); ASSERT(pPos->pRowBuff); } int32_t len = 0; void* pBuff = NULL; - streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len); + pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len); memcpy(pPos->pRowBuff, pBuff, len); taosMemoryFree(pBuff); (*pVal) = pPos->pRowBuff; @@ -333,7 +416,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** } bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { - SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); + SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); if (pos) { return true; } @@ -349,13 +432,13 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); } +void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); } -void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) { +void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; - taosEncodeFixedI64(&buff, *key); + taosEncodeFixedI64(&buff, *pKey); } int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { @@ -487,6 +570,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } +//todo(liuyao) session需要支持recover,需要修改下面代码,下面只是interval的。 int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; if (pFileState->maxTs != INT64_MIN) { @@ -508,7 +592,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } void* pVal = NULL; int32_t pVLen = 0; - SRowBuffPos* pNewPos = getNewRowPos(pFileState); + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen); if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); @@ -521,7 +605,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, pVLen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; - code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); + code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); break; @@ -539,3 +623,23 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { pFileState->flushMark = TMAX(pFileState->flushMark, ts); pFileState->maxTs = TMAX(pFileState->maxTs, ts); } + +void* getRowStateBuff(SStreamFileState* pFileState) { + return pFileState->rowStateBuff; +} + +void* getStateFileStore(SStreamFileState* pFileState) { + return pFileState->pFileStore; +} + +bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { + return ts < (pFileState->maxTs - pFileState->deleteMark); +} + +bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) { + return ts < pFileState->flushMark; +} + +int32_t getRowStateRowSize(SStreamFileState* pFileState) { + return pFileState->rowSize; +} diff --git a/tests/script/tsim/stream/session1.sim b/tests/script/tsim/stream/session1.sim index 3be604a828..cf42159d84 100644 --- a/tests/script/tsim/stream/session1.sim +++ b/tests/script/tsim/stream/session1.sim @@ -132,85 +132,85 @@ sql select * from streamt order by s desc; # row 0 if $data01 != 2 then - print ======$data01 + print =====data01=$data01 goto loop2 endi if $data02 != 29 then - print ======$data02 + print =====data02=$data02 goto loop2 endi if $data03 != 7 then - print ======$data03 + print =====data03=$data03 goto loop2 endi if $data04 != 22 then - print ======$data04 + print =====data04=$data04 goto loop2 endi # row 1 if $data11 != 3 then - print ======$data11 + print =====data11=$data11 goto loop2 endi if $data12 != 33 then - print ======$data12 + print =====data12=$data12 goto loop2 endi if $data13 != 8 then - print ======$data13 + print =====data13=$data13 goto loop2 endi if $data14 != 21 then - print ======$data14 + print =====data14=$data14 goto loop2 endi # row 2 if $data21 != 4 then - print ======$data21 + print =====data21=$data21 goto loop2 endi if $data22 != 25 then - print ======$data22 + print =====data22=$data22 goto loop2 endi if $data23 != 2 then - print ======$data23 + print =====data23=$data23 goto loop2 endi if $data24 != 20 then - print ======$data24 + print =====data24=$data24 goto loop2 endi # row 3 if $data31 != 10 then - print ======$data31 + print =====data31=$data31 goto loop2 endi if $data32 != 54 then - print ======$data32 + print =====data32=$data32 goto loop2 endi if $data33 != 1 then - print ======$data33 + print =====data33=$data33 goto loop2 endi if $data34 != 19 then - print ======$data34 + print =====data34=$data34 goto loop2 endi