session&state operator buff

This commit is contained in:
liuyao 2023-09-20 15:06:08 +08:00
parent 7f406be8bb
commit 1cebda368a
14 changed files with 954 additions and 411 deletions

View File

@ -38,6 +38,9 @@ extern "C" {
#define META_READER_NOLOCK 0x1
#define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2
typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);
@ -115,6 +118,7 @@ typedef struct SRowBuffPos {
void* pKey;
bool beFlushed;
bool beUsed;
bool needFree;
} SRowBuffPos;
// tq
@ -332,6 +336,8 @@ typedef struct {
void* db; // rocksdb_t* db;
void* pCur;
int64_t number;
void* pStreamFileState;
int32_t buffIndex;
} SStreamStateCur;
typedef struct SStateStore {
@ -339,7 +345,8 @@ typedef struct SStateStore {
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal);
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used);
int32_t (*streamStateClearBuff)(SStreamState* pState, void* pVal);
void (*streamStateFreeVal)(void* val);
int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
@ -370,7 +377,7 @@ typedef struct SStateStore {
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
int32_t (*streamStateSessionClear)(SStreamState* pState);
@ -399,7 +406,7 @@ typedef struct SStateStore {
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark,
const char* id, int64_t ckId);
const char* id, int64_t ckId, int8_t type);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);

View File

@ -49,26 +49,30 @@ void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
//session window
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
//state window
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used);
int32_t streamStateClearBuff(SStreamState* pState, void* pVal);
void streamStateFreeVal(void* val);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
@ -80,10 +84,6 @@ void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
@ -91,6 +91,7 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char*
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
SStreamStateCur* createStreamStateCursor();
/***compare func **/

View File

@ -28,14 +28,25 @@ extern "C" {
#endif
typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot;
typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId);
int64_t checkpointId, int8_t type);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used);
int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
@ -52,6 +63,31 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
void* getRowStateBuff(SStreamFileState* pFileState);
void* getStateFileStore(SStreamFileState* pFileState);
bool isDeteled(SStreamFileState* pFileState, TSKEY ts);
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts);
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState);
int32_t getRowStateRowSize(SStreamFileState* pFileState);
// session window
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen);
void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff);
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur);
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey);
// state window
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
#ifdef __cplusplus
}
#endif

View File

@ -29,13 +29,12 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamFileStateInit = streamFileStateInit;
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
pStore->streamStateClearBuff = streamStateClearBuff;
pStore->streamStateFreeVal = streamStateFreeVal;
pStore->streamStatePut = streamStatePut;
@ -91,8 +90,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamFileStateInit = streamFileStateInit;
pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -137,13 +137,12 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamFileStateInit = streamFileStateInit;
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
pStore->streamStateClearBuff = streamStateClearBuff;
pStore->streamStateFreeVal = streamStateFreeVal;
pStore->streamStatePut = streamStatePut;
@ -199,8 +198,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamFileStateInit = streamFileStateInit;
pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -548,9 +548,9 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup;
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
SRowBuffPos* pStatePos;
SSessionKey sessionWin;
bool isOutput;
} SResultWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
@ -579,6 +579,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool isHistoryOp;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
bool clearState;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
@ -672,8 +673,6 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
@ -739,12 +738,6 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI);
int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);

View File

@ -720,38 +720,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
return 0;
}
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pbInfo->pRes;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) <
0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAPI->stateStore.streamStateFreeVal(tbname);
}
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -960,109 +928,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return TSDB_CODE_SUCCESS;
}
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pKey, pResult);
return TSDB_CODE_SUCCESS;
}
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI) {
pAPI->streamStateSessionPut(pState, key, (const void*)buf, size);
releaseOutputBuf(pState, NULL, (SResultRow*)buf, pAPI);
return TSDB_CODE_SUCCESS;
}
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
int32_t size = 0;
void* pVal = NULL;
int32_t code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size);
// ASSERT(code == 0);
if (code == -1) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;
}
SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
continue;
}
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAPI->stateStore.streamStateFreeVal(tbname);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
break;
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
break;
}
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
}
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
void streamOpReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {

View File

@ -1057,7 +1057,7 @@ void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup
} else {
memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
}
pAPI->streamStateReleaseBuf(pState, NULL, pValue);
pAPI->streamStateFreeVal(pValue);
}
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {

View File

@ -152,8 +152,7 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
}
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
}
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
@ -696,7 +695,6 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
@ -931,17 +929,6 @@ void* decodeSWinKey(void* buf, SWinKey* key) {
return buf;
}
int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) {
int32_t tlen = 0;
tlen += encodeSWinKey(buf, pos->pKey);
return tlen;
}
void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) {
buf = decodeSWinKey(buf, pos->pKey);
return buf;
}
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs);
@ -1155,6 +1142,18 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
return NULL;
}
static int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosArraySort(pUpdated, compar);
tSimpleHashCleanup(*ppWinUpdated);
*ppWinUpdated = NULL;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -1311,15 +1310,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
taosArrayPush(pInfo->pUpdated, pIte);
}
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
@ -1480,7 +1471,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
pInfo->dataVersion = 0;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
@ -1607,10 +1598,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
pScanInfo->twAggSup = *pTwSup;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, SStorageAPI* pApi) {
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi) {
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap;
pSup->stateKeySize = keySize;
@ -1622,10 +1614,14 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
pSup->stateStore = *pStore;
initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState,
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pSup->pResultRows = tSimpleHashInit(32, hashFn);
@ -1648,7 +1644,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
}
pSup->pSessionAPI = pApi;
@ -1687,33 +1683,40 @@ bool inWinRange(STimeWindow* range, STimeWindow* cur) {
return false;
}
int32_t clearOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
return pAPI->streamStateClearBuff(pState, pPos);
}
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
SResultWindowInfo* pCurWin) {
pCurWin->sessionWin.groupId = groupId;
pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs;
int32_t size = pAggSup->resultRowSize;
int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin,
pAggSup->gap, &pCurWin->pOutputBuf, &size);
int32_t code = pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin,
pAggSup->gap, (void**)&pCurWin->pStatePos, &size);
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->pOutputBuf = taosMemoryCalloc(1, size);
clearOutputBuf(pAggSup->pState, pCurWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
}
if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin);
if (pCurWin->pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin);
}
} else {
pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs;
}
qDebug("===stream===set session window buff .start:%" PRId64 ",end:%" PRId64 ",groupid:%" PRIu64,
pCurWin->sessionWin.win.skey, pCurWin->sessionWin.win.ekey, pCurWin->sessionWin.groupId);
}
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
int32_t size = 0;
int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin,
(void**)&pWinInfo->pStatePos, &size);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1774,6 +1777,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
if (pEndTs) {
pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]);
}
memcpy(pWinInfo->pStatePos->pKey, &pWinInfo->sessionWin, sizeof(SSessionKey));
}
return rows - start;
}
@ -1781,7 +1785,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
*pResult = (SResultRow*)pWinInfo->pOutputBuf;
*pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff;
// set time window for current result
(*pResult)->win = pWinInfo->sessionWin.win;
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
@ -1819,20 +1823,24 @@ static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo*
return TSDB_CODE_SUCCESS;
}
SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pPos, false);
return TSDB_CODE_SUCCESS;
}
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
SResultWindowInfo* pNextWin) {
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
pNextWin->isOutput = true;
setSessionWinOutputInfo(pStUpdated, pNextWin);
int32_t size = 0;
pNextWin->sessionWin = pCurWin->sessionWin;
int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin,
(void**)&pNextWin->pStatePos, &size);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pNextWin->pOutputBuf);
SET_SESSION_WIN_INVALID(*pNextWin);
}
return pCur;
pAggSup->stateStore.streamStateFreeCur(pCur);
}
static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
@ -1850,16 +1858,16 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
// Just look for the window behind StartIndex
while (1) {
SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
taosMemoryFree(winInfo.pOutputBuf);
pAPI->stateStore.streamStateFreeCur(pCur);
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
break;
}
SResultRow* pWinResult = NULL;
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
int64_t winDelta = 0;
if (addGap) {
winDelta = pAggSup->gap;
@ -1872,8 +1880,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
}
removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
pAPI->stateStore.streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf);
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
winNum++;
}
return winNum;
@ -1890,25 +1897,21 @@ static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo
// Just look for the window behind StartIndex
while (1) {
SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
taosMemoryFree(winInfo.pOutputBuf);
pAPI->stateStore.streamStateFreeCur(pCur);
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
break;
}
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
pAPI->stateStore.streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf);
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
}
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS;
return pAggSup->stateStore.streamStateSessionPut(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pStatePos, pAggSup->resultRowSize);
}
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
@ -1946,13 +1949,13 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo);
// coverity scan error
if (!winInfo.pStatePos) {
continue;
}
setSessionWinOutputInfo(pStUpdated, &winInfo);
winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
pAggSup->pResultRows, pStUpdated, pStDeleted);
// coverity scan error
if (!winInfo.pOutputBuf) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
int64_t winDelta = 0;
if (addGap) {
@ -2023,17 +2026,6 @@ static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2)
return 0;
}
static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, NULL);
taosArrayPush(pUpdated, key);
}
taosArraySort(pUpdated, sessionKeyCompareAsc);
return TSDB_CODE_SUCCESS;
}
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
@ -2111,16 +2103,18 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
continue;
}
if (code == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) {
if (num == 0) {
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
parentWin.sessionWin = childWin.sessionWin;
memcpy(parentWin.pStatePos->pKey, &parentWin.sessionWin, sizeof(SSessionKey));
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
break;
}
}
@ -2131,9 +2125,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
} else {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, childWin.pStatePos, &pAggSup->stateStore);
break;
}
}
@ -2203,6 +2197,97 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->freeItem = false;
}
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SRowBuffPos* pPos = *(SRowBuffPos**) taosArrayGet(pGroupResInfo->pRows, i);
SResultRow* pRow = NULL;
int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
SSessionKey* pKey = (SSessionKey*) pPos->pKey;
if (code == -1) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;
}
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
continue;
}
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAPI->stateStore.streamStateFreeVal(tbname);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
break;
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
break;
}
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
}
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version
@ -2232,6 +2317,7 @@ static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@ -2283,7 +2369,7 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
buf = taosDecodeFixedBool(buf, &key->isOutput);
key->pOutputBuf = NULL;
key->pStatePos->pRowBuff = NULL;
buf = decodeSSessionKey(buf, &key->sessionWin);
return buf;
}
@ -2488,10 +2574,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
@ -2619,18 +2703,18 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error;
}
}
SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pExpSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -2670,6 +2754,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->clearState = false;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
// for stream
void* buff = NULL;
@ -2727,11 +2812,18 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return opRes;
}
if (pInfo->clearState) {
clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer
clearStreamSessionOperator(pInfo);
}
if (pOperator->status == OP_RES_TO_RETURN) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
// semi session operator clear disk buffer
clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator);
pInfo->clearState = false;
return NULL;
}
}
@ -2760,6 +2852,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStUpdated, pWins);
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
taosArrayDestroy(pWins);
pInfo->clearState = true;
break;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated);
@ -2787,10 +2880,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
@ -2806,7 +2897,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
// semi session operator clear disk buffer
clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator);
return NULL;
@ -2836,7 +2927,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, NULL);
SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle);
if (pChildOp == NULL) {
goto _error;
}
@ -2924,9 +3015,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.sessionWin.win.ekey = ts;
int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pKeyData, pAggSup->stateKeySize, compareStateKey,
&pCurWin->winInfo.pOutputBuf, &size);
(void**)&pCurWin->winInfo.pStatePos, &size);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
(SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
@ -2934,11 +3025,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf,
&pAggSup->pSessionAPI->stateStore);
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
clearOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
(SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
@ -2952,7 +3041,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
if (pCurWin->winInfo.pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
}
} else if (pKeyData) {
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
varDataCopy(pCurWin->pStateKey->pData, pKeyData);
@ -2966,12 +3057,12 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
int32_t nextSize = pAggSup->resultRowSize;
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin,
&pNextWin->winInfo.pOutputBuf, &nextSize);
(void**)&pNextWin->winInfo.pStatePos, &nextSize);
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
} else {
pNextWin->pStateKey =
(SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
(SStateKeys*)((char*)pNextWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pNextWin->pStateKey->type = pAggSup->stateKeyType;
pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys);
@ -3008,6 +3099,7 @@ int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNex
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
}
pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
if (!isEqualStateKey(pWinInfo, pKeyData)) {
*allEqual = false;
}
@ -3057,7 +3149,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo nextWin = {0};
setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
if (IS_VALID_SESSION_WIN(nextWin.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore);
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
}
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
@ -3068,7 +3160,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf, &pAPI->stateStore);
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
continue;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
@ -3109,7 +3201,7 @@ int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
void* key = tSimpleHashGetKey(pIte, &keyLen);
tlen += encodeSSessionKey(buf, key);
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
}
@ -3285,10 +3377,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
copyUpdateResult(pInfo->pSeUpdated, pInfo->pUpdated);
copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pSeUpdated);
pInfo->pSeUpdated = NULL;
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
@ -3333,6 +3423,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey);
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
@ -3344,7 +3435,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
}
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
taosMemoryFree(pNextWin->pOutputBuf);
releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
}
void streamStateReloadState(SOperatorInfo* pOperator) {
@ -3394,8 +3485,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
}
} else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf,
&pAggSup->pSessionAPI->stateStore);
releaseOutputBuf(pAggSup->pState, nextInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
}
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
@ -3444,18 +3534,19 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
int16_t type = pColNode->node.resType.type;
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI);
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -3734,7 +3825,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);

View File

@ -2006,7 +2006,7 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2062,7 +2062,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
qDebug("seek to last:%s", tbuf);
}
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) return NULL;
pCur->number = pState->number;
@ -2089,7 +2089,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
qDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb;
@ -2178,7 +2178,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2218,7 +2218,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2256,7 +2256,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2357,7 +2357,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (pCur == NULL) return NULL;
@ -2418,7 +2418,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (!pCur) {
return NULL;
}
@ -2456,7 +2456,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2494,7 +2494,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return -1;
}
@ -2764,7 +2764,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
pCur->db = wrapper->rocksdb;

View File

@ -0,0 +1,470 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
#include "thash.h"
#include "tsimplehash.h"
typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos);
int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey;
return sessionWinKeyCmpr(pWin1, pWin2);
}
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
int firstPos = 0, lastPos = num - 1, midPos = -1;
int numOfRows = 0;
if (num <= 0) return -1;
// find the first position which is smaller or equal than the key.
// if all data is bigger than the key return -1
while (1) {
if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (cmpFn(key, keyList, midPos) < 0) {
lastPos = midPos - 1;
} else if (cmpFn(key, keyList, midPos) > 0) {
firstPos = midPos + 1;
} else {
break;
}
}
return midPos;
}
int64_t getSessionWindowEndkey(void* data, int32_t index) {
SArray* pWinInfos = (SArray*)data;
SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
return pWin->win.ekey;
}
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
return true;
}
return false;
}
static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
taosArrayPush(pWinInfos, &pNewPos);
return pNewPos;
}
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey, int32_t index) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
taosArrayInsert(pWinInfos, index, &pNewPos);
return pNewPos;
}
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = key;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
TSKEY startTs = pWinKey->win.skey;
TSKEY endTs = pWinKey->win.ekey;
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
goto _end;
}
// find the first position which is smaller than the pWinKey
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
SRowBuffPos* pPos = NULL;
if (index >= 0) {
pPos = taosArrayGetP(pWinStates, index);
if (inSessionWindow(pPos->pKey, startTs, gap)) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
int32_t len = 0;
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pWinKey, gap, &p, &len);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
}
taosMemoryFree(p);
(*pVal) = pNewPos;
goto _end;
}
}
if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
goto _end;
}
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1);
_end:
return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (index >= 0) {
SRowBuffPos** ppos = taosArrayGet(pWinStates, index);
if (inSessionWindow((*ppos)->pKey, pWinKey->win.skey, gap)) {
taosArrayRemove(pWinStates, index);
}
}
return TSDB_CODE_SUCCESS;
}
void sessionWinStateClear(SStreamFileState* pFileState) {
int32_t buffSize = getRowStateRowSize(pFileState);
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
void* pBuff = getRowStateBuff(pFileState);
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = *((void**)pIte);
int32_t size = taosArrayGetSize(pWinStates);
for (int32_t i = 0; i < size; i++) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
memset(pPos->pRowBuff, 0, buffSize);
}
}
}
void sessionWinStateCleanup(void* pBuff) {
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = (SArray*) pIte;
taosArrayDestroy(pWinStates);
}
tSimpleHashCleanup(pBuff);
}
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
SArray** pWins, int32_t* pIndex) {
SStreamStateCur* pCur = NULL;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return NULL;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (pWins) {
(*pWins) = pWinStates;
}
if (index >= 0) {
pCur = createStreamStateCursor();
pCur->buffIndex = index;
pCur->pStreamFileState = pFileState;
if (pIndex) {
*pIndex = index;
}
}
return pCur;
}
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
if (pCur) {
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
if (!pCur) {
return NULL;
}
pCur->buffIndex = -1;
pCur->pStreamFileState = pFileState;
return pCur;
}
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur** ppCur) {
SStreamStateCur* pCur = *ppCur;
streamStateFreeCur(pCur);
pCur = createStreamStateCursor();
(*ppCur) = pCur;
pCur->buffIndex = 0;
pCur->pStreamFileState = pFileState;
}
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) {
SSessionKey key = {.groupId = groupId};
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0) {
transformCursor(pFileState, ppCur);
} else {
SStreamStateCur* pCur = *ppCur;
pCur->buffIndex = -1;
pCur->pStreamFileState = pFileState;
}
}
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SArray* pWinStates = NULL;
int32_t index = -1;
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
if (pCur) {
if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) {
sessionWinStateMoveToNext(pCur);
}
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
return pCur;
}
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SArray* pWinStates = NULL;
int32_t index = -1;
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
if (pCur) {
sessionWinStateMoveToNext(pCur);
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
return pCur;
}
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
if (!pCur) {
return TSDB_CODE_FAILED;
}
SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_FAILED;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= 0) {
if (pCur->buffIndex >= size) {
return TSDB_CODE_FAILED;
}
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
*pVal = pPos;
}
*pKey = *(SSessionKey*)(pPos->pKey);
} else {
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0) {
transformCursor(pCur->pStreamFileState, &pCur);
if (pCur->buffIndex >= size) {
return TSDB_CODE_FAILED;
}
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
*pVal = pPos;
}
*pKey = *(SSessionKey*)(pPos->pKey);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {
if (pCur && pCur->buffIndex >= 0) {
pCur->buffIndex++;
} else {
streamStateCurNext_rocksdb(NULL, pCur);
}
return TSDB_CODE_SUCCESS;
}
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
SSessionKey tmpKey = *key;
int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
bool hasCurrentPrev = true;
if (code == TSDB_CODE_FAILED) {
pCur = sessionWinStateSeekKeyNext(pFileState, key);
code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
hasCurrentPrev = false;
}
if (code == TSDB_CODE_FAILED) {
return TSDB_CODE_FAILED;
}
if (sessionRangeKeyCmpr(key, &tmpKey) == 0) {
*curKey = tmpKey;
return code;
} else if (!hasCurrentPrev) {
return TSDB_CODE_FAILED;
}
sessionWinStateMoveToNext(pCur);
code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
if (code == TSDB_CODE_SUCCESS && sessionRangeKeyCmpr(key, &tmpKey) == 0) {
*curKey = tmpKey;
} else {
code = TSDB_CODE_FAILED;
}
return code;
}
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = key;
TSKEY gap = 0;
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
TSKEY startTs = pWinKey->win.skey;
TSKEY endTs = pWinKey->win.ekey;
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
goto _end;
}
// find the first position which is smaller than the pWinKey
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
SRowBuffPos* pPos = NULL;
int32_t valSize = *pVLen;
if (index >= 0) {
pPos = taosArrayGetP(pWinStates, index);
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
int32_t len = 0;
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, &len);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
}
taosMemoryFree(p);
(*pVal) = pNewPos;
goto _end;
}
}
if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
goto _end;
}
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1);
code = TSDB_CODE_FAILED;
_end:
return code;
}

View File

@ -395,9 +395,6 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
streamFileStateClear(pState->pFileState);
if (needClearDiskBuff(pState->pFileState)) {
streamStateClear_rocksdb(pState);
}
return 0;
#else
SWinKey key = {.ts = 0, .groupId = 0};
@ -466,20 +463,24 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
#endif
}
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
// todo refactor
qDebug("streamStateReleaseBuf");
if (!pVal) {
return 0;
}
#ifdef USE_ROCKSDB
taosMemoryFree(pVal);
streamFileStateReleaseBuff(pState->pFileState, pVal, used);
#else
streamStateFreeVal(pVal);
#endif
return 0;
}
int32_t streamStateClearBuff(SStreamState* pState, void* pVal) {
return streamFileStateClearBuff(pState->pFileState, pVal);
}
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillGetCur_rocksdb(pState, key);
@ -569,39 +570,6 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
#endif
}
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetFirst_rocksdb(pState, key);
#else
// todo refactor
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel(pState, &tmp);
return code;
#endif
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_first(pCur->iter);
return 0;
#else
return tdbTbcMoveToFirst(pCur->pCur);
#endif
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_last(pCur->iter);
return 0;
#else
return tdbTbcMoveToLast(pCur->pCur);
#endif
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateSeekKeyNext_rocksdb(pState, key);
@ -693,7 +661,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
return streamStateCurNext_rocksdb(pState, pCur);
return sessionWinStateMoveToNext(pCur);
#else
if (!pCur) {
return -1;
@ -714,7 +682,7 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
#endif
}
void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) {
if (!pCur || pCur->buffIndex >= 0) {
return;
}
qDebug("streamStateFreeCur");
@ -734,11 +702,17 @@ void streamStateFreeVal(void* val) {
#endif
}
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) {
int32_t code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
return code;
}
return TSDB_CODE_SUCCESS;
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
@ -748,6 +722,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
ASSERT(0);
return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else
@ -773,7 +748,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionDel_rocksdb(pState, key);
return deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey));
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
@ -782,7 +757,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
return sessionWinStateSeekKeyCurrentPrev(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -813,7 +788,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
return sessionWinStateSeekKeyCurrentNext(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -845,7 +820,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyNext_rocksdb(pState, key);
return sessionWinStateSeekKeyNext(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -876,7 +851,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
return sessionWinStateGetKVByCur(pCur, pKey, pVal, pVLen);
#else
if (!pCur) {
return -1;
@ -899,6 +874,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v
int32_t streamStateSessionClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
sessionWinStateClear(pState->pFileState);
return streamStateSessionClear_rocksdb(pState);
#else
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
@ -923,7 +899,7 @@ int32_t streamStateSessionClear(SStreamState* pState) {
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -976,7 +952,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen);
#else
// todo refactor
int32_t res = 0;
@ -1032,7 +1008,7 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
// todo refactor
#ifdef USE_ROCKSDB
return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
int32_t res = 0;
SSessionKey tmpKey = *key;
@ -1143,6 +1119,12 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
SStreamStateCur* createStreamStateCursor() {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->buffIndex = -1;
return pCur;
}
#if 0
char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));

View File

@ -16,7 +16,6 @@
#include "tstreamFileState.h"
#include "query.h"
#include "storageapi.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
@ -29,29 +28,64 @@
#define MIN_NUM_OF_ROW_BUFF 10240
struct SStreamFileState {
SList* usedBuffs;
SList* freeBuffs;
SSHashObj* rowBuffMap;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
SList* usedBuffs;
SList* freeBuffs;
void* rowStateBuff;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
_state_file_clear_fn stateFileClearFn;
};
typedef SRowBuffPos SRowBuffInfo;
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
return tSimpleHashRemove(pBuff, pKey, keyLen);
}
void stateHashBuffClearFn(void* pBuff) {
tSimpleHashClear(pBuff);
}
void stateHashBuffCleanupFn(void* pBuff) {
tSimpleHashCleanup(pBuff);
}
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId) {
int64_t checkpointId, int8_t type) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
@ -69,8 +103,25 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
if (type == STREAM_STATE_BUFF_HASH) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateFileRemoveFn = intervalFileRemoveFn;
pFileState->stateFileGetFn = intervalFileGetFn;
pFileState->stateFileClearFn = streamStateClear_rocksdb;
} else {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuff;
pFileState->stateFileRemoveFn = sessionFileRemoveFn;
pFileState->stateFileGetFn = sessionFileGetFn;
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
}
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
goto _error;
}
@ -134,10 +185,17 @@ void streamFileStateDestroy(SStreamFileState* pFileState) {
taosMemoryFree(pFileState->id);
tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
tSimpleHashCleanup(pFileState->rowBuffMap);
pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
taosMemoryFree(pFileState);
}
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
if (pPos->pRowBuff) {
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
}
}
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -146,11 +204,10 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
putFreeBuff(pFileState, pPos);
if (!all) {
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen);
}
destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
@ -162,12 +219,19 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
void streamFileStateClear(SStreamFileState* pFileState) {
pFileState->flushMark = INT64_MIN;
pFileState->maxTs = INT64_MIN;
tSimpleHashClear(pFileState->rowBuffMap);
tSimpleHashClear(pFileState->rowStateBuff);
clearExpiredRowBuff(pFileState, 0, true);
}
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
if (pPos->needFree) {
putFreeBuff(pFileState, pPos);
}
pPos->beUsed = used;
}
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
SListIter iter = {0};
@ -179,10 +243,12 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
if (pPos->beUsed == used) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
i++;
if (pPos->pRowBuff) {
i++;
}
}
}
@ -210,9 +276,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&fIter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
pPos->pRowBuff = NULL;
putFreeBuff(pFileState, pPos);
}
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
@ -227,7 +291,9 @@ int32_t clearRowBuff(SStreamFileState* pFileState) {
return TSDB_CODE_SUCCESS;
}
void* getFreeBuff(SList* lists, int32_t buffSize) {
void* getFreeBuff(SStreamFileState* pFileState) {
SList* lists = pFileState->freeBuffs;
int32_t buffSize = pFileState->rowSize;
SListNode* pNode = tdListPopHead(lists);
if (!pNode) {
return NULL;
@ -238,10 +304,18 @@ void* getFreeBuff(SList* lists, int32_t buffSize) {
return ptr;
}
int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
if (pPos->pRowBuff) {
memset(pPos->pRowBuff, 0, pFileState->rowSize);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
void* pBuff = getFreeBuff(pFileState);
if (pBuff) {
pPos->pRowBuff = pBuff;
goto _end;
@ -258,7 +332,7 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
pPos->pRowBuff = getFreeBuff(pFileState);
_end:
tdListAppend(pFileState->usedBuffs, &pPos);
@ -266,9 +340,17 @@ _end:
return pPos;
}
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
SRowBuffPos* newPos = getNewRowPos(pFileState);
newPos->beUsed = true;
newPos->beFlushed = false;
newPos->needFree = false;
return newPos;
}
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
@ -276,14 +358,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
(*pos)->beFlushed = false;
return TSDB_CODE_SUCCESS;
}
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
pNewPos->beUsed = true;
pNewPos->beFlushed = false;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, keyLen);
TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts)) {
int32_t len = 0;
void* p = NULL;
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
@ -294,7 +374,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
taosMemoryFree(p);
}
tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES);
tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
@ -303,9 +383,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
}
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = tSimpleHashRemove(pFileState->rowBuffMap, pKey, keyLen);
int32_t code_rocks = streamStateDel_rocksdb(pFileState->pFileStore, pKey);
return code_buff == TSDB_CODE_SUCCESS ? code_buff : code_rocks;
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
@ -314,17 +397,17 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
return TSDB_CODE_SUCCESS;
}
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
pPos->pRowBuff = getFreeBuff(pFileState);
if (!pPos->pRowBuff) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
pPos->pRowBuff = getFreeBuff(pFileState);
ASSERT(pPos->pRowBuff);
}
int32_t len = 0;
void* pBuff = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len);
pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
(*pVal) = pPos->pRowBuff;
@ -333,7 +416,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
}
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
return true;
}
@ -349,13 +432,13 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs;
}
void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); }
void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) {
void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *key);
taosEncodeFixedI64(&buff, *pKey);
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
@ -487,6 +570,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code;
}
//todo(liuyao) session需要支持recover需要修改下面代码下面只是interval的。
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
@ -508,7 +592,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
}
void* pVal = NULL;
int32_t pVLen = 0;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
@ -521,7 +605,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
memcpy(pNewPos->pRowBuff, pVal, pVLen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos);
break;
@ -539,3 +623,23 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
}
void* getRowStateBuff(SStreamFileState* pFileState) {
return pFileState->rowStateBuff;
}
void* getStateFileStore(SStreamFileState* pFileState) {
return pFileState->pFileStore;
}
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) {
return ts < pFileState->flushMark;
}
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
return pFileState->rowSize;
}

View File

@ -132,85 +132,85 @@ sql select * from streamt order by s desc;
# row 0
if $data01 != 2 then
print ======$data01
print =====data01=$data01
goto loop2
endi
if $data02 != 29 then
print ======$data02
print =====data02=$data02
goto loop2
endi
if $data03 != 7 then
print ======$data03
print =====data03=$data03
goto loop2
endi
if $data04 != 22 then
print ======$data04
print =====data04=$data04
goto loop2
endi
# row 1
if $data11 != 3 then
print ======$data11
print =====data11=$data11
goto loop2
endi
if $data12 != 33 then
print ======$data12
print =====data12=$data12
goto loop2
endi
if $data13 != 8 then
print ======$data13
print =====data13=$data13
goto loop2
endi
if $data14 != 21 then
print ======$data14
print =====data14=$data14
goto loop2
endi
# row 2
if $data21 != 4 then
print ======$data21
print =====data21=$data21
goto loop2
endi
if $data22 != 25 then
print ======$data22
print =====data22=$data22
goto loop2
endi
if $data23 != 2 then
print ======$data23
print =====data23=$data23
goto loop2
endi
if $data24 != 20 then
print ======$data24
print =====data24=$data24
goto loop2
endi
# row 3
if $data31 != 10 then
print ======$data31
print =====data31=$data31
goto loop2
endi
if $data32 != 54 then
print ======$data32
print =====data32=$data32
goto loop2
endi
if $data33 != 1 then
print ======$data33
print =====data33=$data33
goto loop2
endi
if $data34 != 19 then
print ======$data34
print =====data34=$data34
goto loop2
endi