Merge pull request #22987 from taosdata/feat/TD-26174

session&state operator buff
This commit is contained in:
Haojun Liao 2023-10-09 14:37:20 +08:00 committed by GitHub
commit 5b117199f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1771 additions and 490 deletions

View File

@ -38,6 +38,9 @@ extern "C" {
#define META_READER_NOLOCK 0x1
#define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2
typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);
@ -115,6 +118,7 @@ typedef struct SRowBuffPos {
void* pKey;
bool beFlushed;
bool beUsed;
bool needFree;
} SRowBuffPos;
// tq
@ -333,6 +337,8 @@ typedef struct {
void* db; // rocksdb_t* db;
void* pCur;
int64_t number;
void* pStreamFileState;
int32_t buffIndex;
} SStreamStateCur;
typedef struct SStateStore {
@ -340,7 +346,8 @@ typedef struct SStateStore {
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal);
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used);
int32_t (*streamStateClearBuff)(SStreamState* pState, void* pVal);
void (*streamStateFreeVal)(void* val);
int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
@ -371,7 +378,7 @@ typedef struct SStateStore {
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
int32_t (*streamStateSessionClear)(SStreamState* pState);
@ -400,7 +407,7 @@ typedef struct SStateStore {
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark,
const char* id, int64_t ckId);
const char* id, int64_t ckId, int8_t type);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);

View File

@ -49,26 +49,30 @@ void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
//session window
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
//state window
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used);
int32_t streamStateClearBuff(SStreamState* pState, void* pVal);
void streamStateFreeVal(void* val);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
@ -76,14 +80,11 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur);
void streamStateResetCur(SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
@ -91,6 +92,7 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char*
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
SStreamStateCur* createStreamStateCursor();
/***compare func **/

View File

@ -28,20 +28,33 @@ extern "C" {
#endif
typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot;
typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num);
typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId);
int64_t checkpointId, int8_t type);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used);
int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
void releaseRowBuffPos(SRowBuffPos* pBuff);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
@ -52,6 +65,34 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
void* getRowStateBuff(SStreamFileState* pFileState);
void* getStateFileStore(SStreamFileState* pFileState);
bool isDeteled(SStreamFileState* pFileState, TSKEY ts);
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap);
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState);
int32_t getRowStateRowSize(SStreamFileState* pFileState);
// session window
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen);
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff);
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey);
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur);
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey);
// state window
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
#ifdef __cplusplus
}
#endif

View File

@ -29,13 +29,12 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamFileStateInit = streamFileStateInit;
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
pStore->streamStateClearBuff = streamStateClearBuff;
pStore->streamStateFreeVal = streamStateFreeVal;
pStore->streamStatePut = streamStatePut;
@ -91,8 +90,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamFileStateInit = streamFileStateInit;
pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -137,13 +137,12 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamFileStateInit = streamFileStateInit;
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
pStore->streamStateGetByPos = streamStateGetByPos;
pStore->streamStatePutParName = streamStatePutParName;
pStore->streamStateGetParName = streamStateGetParName;
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
pStore->streamStateClearBuff = streamStateClearBuff;
pStore->streamStateFreeVal = streamStateFreeVal;
pStore->streamStatePut = streamStatePut;
@ -199,8 +198,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
pStore->streamFileStateInit = streamFileStateInit;
pStore->streamFileStateDestroy = streamFileStateDestroy;
pStore->streamFileStateClear = streamFileStateClear;
pStore->needClearDiskBuff = needClearDiskBuff;

View File

@ -548,9 +548,9 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup;
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
SRowBuffPos* pStatePos;
SSessionKey sessionWin;
bool isOutput;
} SResultWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
@ -579,6 +579,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool isHistoryOp;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
bool clearState;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
@ -672,8 +673,6 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
@ -739,12 +738,6 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI);
int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);

View File

@ -720,38 +720,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
return 0;
}
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pbInfo->pRes;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) <
0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAPI->stateStore.streamStateFreeVal(tbname);
}
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -960,109 +928,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return TSDB_CODE_SUCCESS;
}
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pKey, pResult);
return TSDB_CODE_SUCCESS;
}
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI) {
pAPI->streamStateSessionPut(pState, key, (const void*)buf, size);
releaseOutputBuf(pState, NULL, (SResultRow*)buf, pAPI);
return TSDB_CODE_SUCCESS;
}
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
int32_t size = 0;
void* pVal = NULL;
int32_t code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size);
// ASSERT(code == 0);
if (code == -1) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;
}
SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
continue;
}
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAPI->stateStore.streamStateFreeVal(tbname);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
break;
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
break;
}
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
}
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
void streamOpReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {

View File

@ -1057,7 +1057,7 @@ void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup
} else {
memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
}
pAPI->streamStateReleaseBuf(pState, NULL, pValue);
pAPI->streamStateFreeVal(pValue);
}
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {

File diff suppressed because it is too large Load Diff

View File

@ -2007,7 +2007,7 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2063,7 +2063,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
qDebug("seek to last:%s", tbuf);
}
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) return NULL;
pCur->number = pState->number;
@ -2090,7 +2090,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
qDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb;
@ -2179,7 +2179,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2219,7 +2219,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2257,7 +2257,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2358,7 +2358,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (pCur == NULL) return NULL;
@ -2419,7 +2419,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (!pCur) {
return NULL;
}
@ -2457,7 +2457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2495,7 +2495,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return -1;
}
@ -2765,7 +2765,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
pCur->db = wrapper->rocksdb;

View File

@ -0,0 +1,583 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
#include "thash.h"
#include "tsimplehash.h"
typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos);
int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey;
return sessionWinKeyCmpr(pWin1, pWin2);
}
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
int firstPos = 0, lastPos = num - 1, midPos = -1;
int numOfRows = 0;
if (num <= 0) return -1;
// find the first position which is smaller or equal than the key.
// if all data is bigger than the key return -1
while (1) {
if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (cmpFn(key, keyList, midPos) < 0) {
lastPos = midPos - 1;
} else if (cmpFn(key, keyList, midPos) > 0) {
firstPos = midPos + 1;
} else {
break;
}
}
return midPos;
}
int64_t getSessionWindowEndkey(void* data, int32_t index) {
SArray* pWinInfos = (SArray*)data;
SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
return pWin->win.ekey;
}
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
return true;
}
return false;
}
static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
taosArrayPush(pWinInfos, &pNewPos);
return pNewPos;
}
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey, int32_t index) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
taosArrayInsert(pWinInfos, index, &pNewPos);
return pNewPos;
}
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
memcpy(pNewPos->pRowBuff, p, *pVLen);
taosMemoryFree(p);
return pNewPos;
}
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
TSKEY startTs = pKey->win.skey;
TSKEY endTs = pKey->win.ekey;
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
void* pFileStore = getStateFileStore(pFileState);
void* p = NULL;
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
code = code_file;
qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
} else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
taosMemoryFree(p);
}
goto _end;
}
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
SRowBuffPos* pPos = NULL;
if (index >= 0) {
pPos = taosArrayGetP(pWinStates, index);
if (inSessionWindow(pPos->pKey, startTs, gap)) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*pKey = *pDestWinKey;
goto _end;
}
}
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*pKey = *pDestWinKey;
goto _end;
}
}
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
goto _end;
} else {
taosMemoryFree(p);
}
}
}
if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
goto _end;
}
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1);
code = TSDB_CODE_FAILED;
_end:
return code;
}
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pKey = pPos->pKey;
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
taosArrayPush(pWinStates, &pPos);
goto _end;
}
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
if (index >= 0) {
taosArrayInsert(pWinStates, index, &pPos);
} else {
taosArrayInsert(pWinStates, 0, &pPos);
}
_end:
pPos->needFree = false;
return TSDB_CODE_SUCCESS;
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
void* pBuff = NULL;
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
taosMemoryFreeClear(pBuff);
(*pVal) = pNewPos;
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (index >= 0) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
pPos->beFlushed = true;
taosArrayRemove(pWinStates, index);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pWinKey = (SSessionKey*) pPos->pKey;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (index >= 0) {
SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
if (pItemPos == pPos) {
taosArrayRemove(pWinStates, index);
}
}
return TSDB_CODE_SUCCESS;
}
void sessionWinStateClear(SStreamFileState* pFileState) {
int32_t buffSize = getRowStateRowSize(pFileState);
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
void* pBuff = getRowStateBuff(pFileState);
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = *((void**)pIte);
int32_t size = taosArrayGetSize(pWinStates);
for (int32_t i = 0; i < size; i++) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
memset(pPos->pRowBuff, 0, buffSize);
}
}
}
void sessionWinStateCleanup(void* pBuff) {
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = (SArray*) (*(void**)pIte);
taosArrayDestroy(pWinStates);
}
tSimpleHashCleanup(pBuff);
}
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
SArray** pWins, int32_t* pIndex) {
SStreamStateCur* pCur = NULL;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return NULL;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (pWins) {
(*pWins) = pWinStates;
}
if (index >= 0) {
pCur = createStreamStateCursor();
pCur->buffIndex = index;
pCur->pStreamFileState = pFileState;
if (pIndex) {
*pIndex = index;
}
}
return pCur;
}
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
if (pCur) {
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
if (!pCur) {
return NULL;
}
pCur->buffIndex = -1;
pCur->pStreamFileState = pFileState;
return pCur;
}
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
if (!pCur) {
return;
}
streamStateResetCur(pCur);
pCur->buffIndex = 0;
pCur->pStreamFileState = pFileState;
}
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) {
SSessionKey key = {.groupId = groupId};
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if ( !(*ppCur) ) {
(*ppCur) = createStreamStateCursor();
}
transformCursor(pFileState, *ppCur);
} else if (*ppCur) {
(*ppCur)->buffIndex = -1;
(*ppCur)->pStreamFileState = pFileState;
}
}
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SArray* pWinStates = NULL;
int32_t index = -1;
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
if (pCur) {
if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) {
sessionWinStateMoveToNext(pCur);
}
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
return pCur;
}
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
SArray* pWinStates = NULL;
int32_t index = -1;
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
if (pCur) {
sessionWinStateMoveToNext(pCur);
return pCur;
}
void* pFileStore = getStateFileStore(pFileState);
pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
return pCur;
}
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
if (!pCur) {
return TSDB_CODE_FAILED;
}
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_FAILED;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= 0) {
if (pCur->buffIndex >= size) {
return TSDB_CODE_FAILED;
}
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
*pVal = pPos;
}
*pKey = *(SSessionKey*)(pPos->pKey);
} else {
void* pData = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
transformCursor(pCur->pStreamFileState, pCur);
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
*pVal = pPos;
}
*pKey = *(SSessionKey*)(pPos->pKey);
code = TSDB_CODE_SUCCESS;
} else if (code == TSDB_CODE_SUCCESS && pVal) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
memcpy(pNewPos->pRowBuff, pData, *pVLen);
(*pVal) = pNewPos;
}
taosMemoryFreeClear(pData);
}
return code;
}
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {
if (pCur && pCur->buffIndex >= 0) {
pCur->buffIndex++;
} else {
streamStateCurNext_rocksdb(NULL, pCur);
}
return TSDB_CODE_SUCCESS;
}
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
SSessionKey tmpKey = *key;
int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
bool hasCurrentPrev = true;
if (code == TSDB_CODE_FAILED) {
streamStateFreeCur(pCur);
pCur = sessionWinStateSeekKeyNext(pFileState, key);
code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
hasCurrentPrev = false;
}
if (code == TSDB_CODE_FAILED) {
code = TSDB_CODE_FAILED;
goto _end;
}
if (sessionRangeKeyCmpr(key, &tmpKey) == 0) {
*curKey = tmpKey;
goto _end;
} else if (!hasCurrentPrev) {
code = TSDB_CODE_FAILED;
goto _end;
}
sessionWinStateMoveToNext(pCur);
code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
if (code == TSDB_CODE_SUCCESS && sessionRangeKeyCmpr(key, &tmpKey) == 0) {
*curKey = tmpKey;
} else {
code = TSDB_CODE_FAILED;
}
_end:
streamStateFreeCur(pCur);
return code;
}
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = key;
TSKEY gap = 0;
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
TSKEY startTs = pWinKey->win.skey;
TSKEY endTs = pWinKey->win.ekey;
int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) {
void* pFileStore = getStateFileStore(pFileState);
void* p = NULL;
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
} else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
taosMemoryFree(p);
}
goto _end;
}
// find the first position which is smaller than the pWinKey
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
SRowBuffPos* pPos = NULL;
int32_t valSize = *pVLen;
if (index >= 0) {
pPos = taosArrayGetP(pWinStates, index);
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*key = *pDestWinKey;
goto _end;
}
}
if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs)) {
void* p = NULL;
void* pFileStore = getStateFileStore(pFileState);
int32_t code_file =
streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
goto _end;
} else {
taosMemoryFree(p);
}
}
}
if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
goto _end;
}
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, key, index + 1);
code = TSDB_CODE_FAILED;
_end:
return code;
}

View File

@ -351,7 +351,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
releaseRowBuffPos(pos);
streamFileStateReleaseBuff(pState->pFileState, pos, false);
return code;
}
@ -395,9 +395,6 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
streamFileStateClear(pState->pFileState);
if (needClearDiskBuff(pState->pFileState)) {
streamStateClear_rocksdb(pState);
}
return 0;
#else
SWinKey key = {.ts = 0, .groupId = 0};
@ -422,19 +419,16 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB
int32_t code = 0;
void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
char* cfName = "default";
void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, cfName, batch, pKey, pVal, vLen, 0);
if (code != 0) {
streamStateDestroyBatch(batch);
return code;
}
code = streamStatePutBatch_rocksdb(pState, batch);
streamStateDestroyBatch(batch);
// code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen);
// char* Val = NULL;
// int32_t len = 0;
// code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len);
return code;
#else
return 0;
@ -466,20 +460,24 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
#endif
}
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
// todo refactor
qDebug("streamStateReleaseBuf");
if (!pVal) {
return 0;
}
#ifdef USE_ROCKSDB
taosMemoryFree(pVal);
streamFileStateReleaseBuff(pState->pFileState, pVal, used);
#else
streamStateFreeVal(pVal);
#endif
return 0;
}
int32_t streamStateClearBuff(SStreamState* pState, void* pVal) {
return streamFileStateClearBuff(pState->pFileState, pVal);
}
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillGetCur_rocksdb(pState, key);
@ -569,39 +567,6 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
#endif
}
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetFirst_rocksdb(pState, key);
#else
// todo refactor
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp);
int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel(pState, &tmp);
return code;
#endif
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_first(pCur->iter);
return 0;
#else
return tdbTbcMoveToFirst(pCur->pCur);
#endif
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_last(pCur->iter);
return 0;
#else
return tdbTbcMoveToLast(pCur->pCur);
#endif
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateSeekKeyNext_rocksdb(pState, key);
@ -693,7 +658,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
return streamStateCurNext_rocksdb(pState, pCur);
return sessionWinStateMoveToNext(pCur);
#else
if (!pCur) {
return -1;
@ -713,16 +678,29 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
return tdbTbcMoveToPrev(pCur->pCur);
#endif
}
void streamStateFreeCur(SStreamStateCur* pCur) {
void streamStateResetCur(SStreamStateCur* pCur) {
if (!pCur) {
return;
}
qDebug("streamStateFreeCur");
rocksdb_iter_destroy(pCur->iter);
if (pCur->iter) rocksdb_iter_destroy(pCur->iter);
if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
rocksdb_readoptions_destroy(pCur->readOpt);
if (pCur->readOpt) rocksdb_readoptions_destroy(pCur->readOpt);
tdbTbcClose(pCur->pCur);
memset(pCur, 0, sizeof(SStreamStateCur));
pCur->buffIndex = -1;
}
void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur || pCur->buffIndex >= 0) {
taosMemoryFree(pCur);
return;
}
qDebug("streamStateFreeCur");
streamStateResetCur(pCur);
taosMemoryFree(pCur);
}
@ -734,11 +712,25 @@ void streamStateFreeVal(void* val) {
#endif
}
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
int32_t code = TSDB_CODE_SUCCESS;
SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) {
if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
if (!pos->pRowBuff) {
return code;
}
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true);
putFreeBuff(pState->pFileState, pos);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
} else {
code = putSessionWinResultBuff(pState->pFileState, value);
}
}
return code;
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
@ -748,7 +740,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen);
#else
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
@ -773,7 +765,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionDel_rocksdb(pState, key);
return deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey));
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
@ -782,7 +774,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
return sessionWinStateSeekKeyCurrentPrev(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -813,7 +805,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
return sessionWinStateSeekKeyCurrentNext(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -845,7 +837,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyNext_rocksdb(pState, key);
return sessionWinStateSeekKeyNext(pState->pFileState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -876,7 +868,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
return sessionWinStateGetKVByCur(pCur, pKey, pVal, pVLen);
#else
if (!pCur) {
return -1;
@ -899,6 +891,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v
int32_t streamStateSessionClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
sessionWinStateClear(pState->pFileState);
return streamStateSessionClear_rocksdb(pState);
#else
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
@ -923,7 +916,7 @@ int32_t streamStateSessionClear(SStreamState* pState) {
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
@ -976,7 +969,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen);
#else
// todo refactor
int32_t res = 0;
@ -1032,7 +1025,7 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
// todo refactor
#ifdef USE_ROCKSDB
return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
int32_t res = 0;
SSessionKey tmpKey = *key;
@ -1143,6 +1136,12 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
SStreamStateCur* createStreamStateCursor() {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->buffIndex = -1;
return pCur;
}
#if 0
char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));

View File

@ -16,7 +16,6 @@
#include "tstreamFileState.h"
#include "query.h"
#include "storageapi.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
@ -29,29 +28,98 @@
#define MIN_NUM_OF_ROW_BUFF 10240
struct SStreamFileState {
SList* usedBuffs;
SList* freeBuffs;
SSHashObj* rowBuffMap;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
SList* usedBuffs;
SList* freeBuffs;
void* rowStateBuff;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
GetTsFun getTs;
char* id;
char* cfName;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
_state_buff_remove_by_pos_fn stateBuffRemoveByPosFn;
_state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
_state_file_clear_fn stateFileClearFn;
};
typedef SRowBuffPos SRowBuffInfo;
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
if (pos) {
(*pos)->beFlushed = true;
}
return tSimpleHashRemove(pBuff, pKey, keyLen);
}
int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
size_t keyLen = pFileState->keyLen;
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
if (ppPos) {
if ((*ppPos) == pPos) {
return tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
}
}
return TSDB_CODE_SUCCESS;
}
void stateHashBuffClearFn(void* pBuff) {
tSimpleHashClear(pBuff);
}
void stateHashBuffCleanupFn(void* pBuff) {
tSimpleHashCleanup(pBuff);
}
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
SWinKey* pWinKey = pPos->pKey;
pStateKey->key = *pWinKey;
pStateKey->opNum = num;
return pStateKey;
}
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
SSessionKey* pWinKey = pPos->pKey;
pStateKey->key = *pWinKey;
pStateKey->opNum = num;
return pStateKey;
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId) {
int64_t checkpointId, int8_t type) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
@ -69,8 +137,31 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
if (type == STREAM_STATE_BUFF_HASH) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
pFileState->stateFileRemoveFn = intervalFileRemoveFn;
pFileState->stateFileGetFn = intervalFileGetFn;
pFileState->stateFileClearFn = streamStateClear_rocksdb;
pFileState->cfName = taosStrdup("state");
} else {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
pFileState->stateFileRemoveFn = sessionFileRemoveFn;
pFileState->stateFileGetFn = sessionFileGetFn;
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
pFileState->cfName = taosStrdup("sess");
}
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
goto _error;
}
@ -87,7 +178,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->maxTs = INT64_MIN;
pFileState->id = taosStrdup(taskId);
recoverSnapshot(pFileState, checkpointId);
//todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) {
recoverSnapshot(pFileState, checkpointId);
}
return pFileState;
_error:
@ -132,12 +226,20 @@ void streamFileStateDestroy(SStreamFileState* pFileState) {
}
taosMemoryFree(pFileState->id);
taosMemoryFree(pFileState->cfName);
tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
tSimpleHashCleanup(pFileState->rowBuffMap);
pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
taosMemoryFree(pFileState);
}
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
if (pPos->pRowBuff) {
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
}
}
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -146,11 +248,10 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
putFreeBuff(pFileState, pPos);
if (!all) {
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
}
destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
@ -159,15 +260,40 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
}
}
void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) {
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) {
i++;
}
}
}
}
void streamFileStateClear(SStreamFileState* pFileState) {
pFileState->flushMark = INT64_MIN;
pFileState->maxTs = INT64_MIN;
tSimpleHashClear(pFileState->rowBuffMap);
tSimpleHashClear(pFileState->rowStateBuff);
clearExpiredRowBuff(pFileState, 0, true);
}
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
pPos->beUsed = used;
}
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
SListIter iter = {0};
@ -179,10 +305,12 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
if (pPos->beUsed == used) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
i++;
if (pPos->pRowBuff) {
i++;
}
}
}
@ -197,10 +325,13 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM);
popUsedBuffs(pFileState, pFlushList, num, false);
clearFlushedRowBuff(pFileState, pFlushList, num);
if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, true);
popUsedBuffs(pFileState, pFlushList, num, false);
if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, true);
}
}
flushSnapshot(pFileState, pFlushList, false);
@ -210,9 +341,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&fIter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
pPos->pRowBuff = NULL;
putFreeBuff(pFileState, pPos);
}
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
@ -227,7 +356,9 @@ int32_t clearRowBuff(SStreamFileState* pFileState) {
return TSDB_CODE_SUCCESS;
}
void* getFreeBuff(SList* lists, int32_t buffSize) {
void* getFreeBuff(SStreamFileState* pFileState) {
SList* lists = pFileState->freeBuffs;
int32_t buffSize = pFileState->rowSize;
SListNode* pNode = tdListPopHead(lists);
if (!pNode) {
return NULL;
@ -238,10 +369,18 @@ void* getFreeBuff(SList* lists, int32_t buffSize) {
return ptr;
}
int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
if (pPos->pRowBuff) {
memset(pPos->pRowBuff, 0, pFileState->rowSize);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
void* pBuff = getFreeBuff(pFileState);
if (pBuff) {
pPos->pRowBuff = pBuff;
goto _end;
@ -258,7 +397,7 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
pPos->pRowBuff = getFreeBuff(pFileState);
_end:
tdListAppend(pFileState->usedBuffs, &pPos);
@ -266,9 +405,17 @@ _end:
return pPos;
}
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
SRowBuffPos* newPos = getNewRowPos(pFileState);
newPos->beUsed = true;
newPos->beFlushed = false;
newPos->needFree = false;
return newPos;
}
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
@ -276,14 +423,12 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
(*pos)->beFlushed = false;
return TSDB_CODE_SUCCESS;
}
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
pNewPos->beUsed = true;
pNewPos->beFlushed = false;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, keyLen);
TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
int32_t len = 0;
void* p = NULL;
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
@ -294,7 +439,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
taosMemoryFree(p);
}
tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES);
tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
@ -303,45 +448,60 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
}
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = tSimpleHashRemove(pFileState->rowBuffMap, pKey, keyLen);
int32_t code_rocks = streamStateDel_rocksdb(pFileState->pFileStore, pKey);
return code_buff == TSDB_CODE_SUCCESS ? code_buff : code_rocks;
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static void recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t len = 0;
void* pBuff = NULL;
pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
}
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
if (pPos->pRowBuff) {
if (pPos->needFree) {
recoverSessionRowBuff(pFileState, pPos);
}
(*pVal) = pPos->pRowBuff;
return TSDB_CODE_SUCCESS;
}
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
pPos->pRowBuff = getFreeBuff(pFileState);
if (!pPos->pRowBuff) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
if (pFileState->curRowCount < pFileState->maxRowCount) {
pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
pFileState->curRowCount++;
} else {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState);
}
ASSERT(pPos->pRowBuff);
}
int32_t len = 0;
void* pBuff = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
recoverSessionRowBuff(pFileState, pPos);
(*pVal) = pPos->pRowBuff;
tdListPrepend(pFileState->usedBuffs, &pPos);
if (!pPos->needFree) {
tdListPrepend(pFileState->usedBuffs, &pPos);
}
return TSDB_CODE_SUCCESS;
}
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) {
return true;
}
return false;
}
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
@ -349,13 +509,13 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs;
}
void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); }
void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) {
void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *key);
taosEncodeFixedI64(&buff, *pKey);
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
@ -369,7 +529,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t numOfElems = listNEles(pSnapshot);
SListNode* pNode = NULL;
int idx = streamStateGetCfIdx(pFileState->pFileStore, "state");
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
char* buf = taosMemoryCalloc(1, len);
@ -377,23 +537,23 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
if (pPos->beFlushed) {
if (pPos->beFlushed || !pPos->pRowBuff) {
continue;
}
pPos->beFlushed = true;
qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch);
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, &sKey, pPos->pRowBuff, pFileState->rowSize,
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
0, buf);
taosMemoryFreeClear(pSKey);
// todo handle failure
memset(buf, 0, len);
// qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
}
taosMemoryFree(buf);
@ -508,7 +668,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
}
void* pVal = NULL;
int32_t pVLen = 0;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
@ -521,7 +681,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
memcpy(pNewPos->pRowBuff, pVal, pVLen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos);
break;
@ -539,3 +699,23 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
}
void* getRowStateBuff(SStreamFileState* pFileState) {
return pFileState->rowStateBuff;
}
void* getStateFileStore(SStreamFileState* pFileState) {
return pFileState->pFileStore;
}
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) {
return ts <= (pFileState->flushMark + gap);
}
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
return pFileState->rowSize;
}

View File

@ -8,10 +8,12 @@ sleep 500
sql connect
print step1=============
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s);
sql create stream streams0 trigger at_once ignore expired 0 ignore update 0 into streamt as select _wstart, count(*) c1 from t1 interval(1s);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791212001,2,2,3,1.1);
@ -71,13 +73,13 @@ if $rows != 29 then
goto loop1
endi
print step2=============
sql create database test2 vgroups 10;
sql use test2;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create stream streams2 trigger at_once ignore expired 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791212001,2,2,3,1.1);
@ -137,4 +139,184 @@ if $rows != 29 then
goto loop3
endi
print step3=============
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once ignore expired 0 ignore update 0 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.1);
sql insert into t1 values(1648791215000,1,2,3,1.1);
sql insert into t1 values(1648791217000,1,2,3,1.1);
sql insert into t1 values(1648791219000,1,2,3,1.1);
sql insert into t1 values(1648791221000,1,2,3,1.0);
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791225000,1,2,3,1.0);
sql insert into t1 values(1648791227000,1,2,3,1.0);
sql insert into t1 values(1648791229000,1,2,3,1.0);
sql insert into t1 values(1648791231000,1,2,3,1.0);
sql insert into t1 values(1648791233000,1,2,3,1.1);
sql insert into t1 values(1648791235000,1,2,3,1.1);
sql insert into t1 values(1648791237000,1,2,3,1.1);
sql insert into t1 values(1648791239000,1,2,3,1.1);
sql insert into t1 values(1648791241000,1,2,3,1.0);
sql insert into t1 values(1648791243000,1,2,3,1.0);
sql insert into t1 values(1648791245000,1,2,3,1.0);
sql insert into t1 values(1648791247000,1,2,3,1.0);
sql insert into t1 values(1648791249000,1,2,3,1.0);
sql insert into t1 values(1648791251000,1,2,3,1.0);
sql insert into t1 values(1648791253000,1,2,3,1.1);
sql insert into t1 values(1648791255000,1,2,3,1.1);
sql insert into t1 values(1648791257000,1,2,3,1.1);
sql insert into t1 values(1648791259000,1,2,3,1.1);
sql insert into t1 values(1648791261000,1,2,3,1.0);
sql insert into t1 values(1648791263000,1,2,3,1.0);
sql insert into t1 values(1648791265000,1,2,3,1.0);
sql insert into t1 values(1648791267000,1,2,3,1.0);
sql insert into t1 values(1648791269000,1,2,3,1.0);
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1;
sql select * from streamt1;
if $rows != 30 then
print =====rows=$rows
goto loop4
endi
sql insert into t1 values(1648791211001,1,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.1);
sql insert into t1 values(1648791215001,1,2,3,1.1);
sql insert into t1 values(1648791217001,1,2,3,1.1);
sql insert into t1 values(1648791219001,1,2,3,1.1);
sql insert into t1 values(1648791221001,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791225001,1,2,3,1.0);
sql insert into t1 values(1648791227001,1,2,3,1.0);
sql insert into t1 values(1648791229001,1,2,3,1.0);
$loop_count = 0
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1;
sql select * from streamt1;
if $rows != 30 then
print =====rows=$rows
goto loop5
endi
if $data01 != 2 then
print =====data01=$data01
goto loop5
endi
if $data91 != 2 then
print =====data91=$data91
goto loop5
endi
sql insert into t1 values(1648791231001,1,2,3,1.0);
sql insert into t1 values(1648791233001,1,2,3,1.1);
sql insert into t1 values(1648791235001,1,2,3,1.1);
sql insert into t1 values(1648791237001,1,2,3,1.1);
sql insert into t1 values(1648791239001,1,2,3,1.1);
sql insert into t1 values(1648791241001,1,2,3,1.0);
sql insert into t1 values(1648791243001,1,2,3,1.0);
sql insert into t1 values(1648791245001,1,2,3,1.0);
sql insert into t1 values(1648791247001,1,2,3,1.0);
sql insert into t1 values(1648791249001,1,2,3,1.0);
$loop_count = 0
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1;
sql select * from streamt1;
if $rows != 30 then
print =====rows=$rows
goto loop6
endi
if $data[10][1] != 2 then
print =====data[10][1]=$data[10][1]
goto loop6
endi
if $data[19][1] != 2 then
print =====data[19][1]=$data[19][1]
goto loop6
endi
sql insert into t1 values(1648791251001,1,2,3,1.0);
sql insert into t1 values(1648791253001,1,2,3,1.1);
sql insert into t1 values(1648791255001,1,2,3,1.1);
sql insert into t1 values(1648791257001,1,2,3,1.1);
sql insert into t1 values(1648791259001,1,2,3,1.1);
sql insert into t1 values(1648791261001,1,2,3,1.0);
sql insert into t1 values(1648791263001,1,2,3,1.0);
sql insert into t1 values(1648791265001,1,2,3,1.0);
sql insert into t1 values(1648791267001,1,2,3,1.0);
sql insert into t1 values(1648791269001,1,2,3,1.0);
$loop_count = 0
loop7:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt1;
sql select * from streamt1;
if $rows != 30 then
print =====rows=$rows
goto loop7
endi
if $data[20][1] != 2 then
print =====[20][1]=$[20][1]
goto loop7
endi
if $data[29][1] != 2 then
print =====[29][1]=$[29][1]
goto loop7
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,217 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1=============
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,2,3,1.1);
sql insert into t1 values(1648791215000,3,2,3,1.1);
sql insert into t1 values(1648791217000,4,2,3,1.1);
sql insert into t1 values(1648791219000,5,2,3,1.1);
sql insert into t1 values(1648791221000,6,2,3,1.0);
sql insert into t1 values(1648791223000,7,2,3,1.0);
sql insert into t1 values(1648791225000,8,2,3,1.0);
sql insert into t1 values(1648791227000,9,2,3,1.0);
sql insert into t1 values(1648791229000,10,2,3,1.0);
sql insert into t1 values(1648791231000,11,2,3,1.0);
sql insert into t1 values(1648791233000,12,2,3,1.1);
sql insert into t1 values(1648791235000,13,2,3,1.1);
sql insert into t1 values(1648791237000,14,2,3,1.1);
sql insert into t1 values(1648791239000,15,2,3,1.1);
sql insert into t1 values(1648791241000,16,2,3,1.0);
sql insert into t1 values(1648791243000,17,2,3,1.0);
sql insert into t1 values(1648791245000,18,2,3,1.0);
sql insert into t1 values(1648791247000,19,2,3,1.0);
sql insert into t1 values(1648791249000,20,2,3,1.0);
sql insert into t1 values(1648791251000,21,2,3,1.0);
sql insert into t1 values(1648791253000,22,2,3,1.1);
sql insert into t1 values(1648791255000,23,2,3,1.1);
sql insert into t1 values(1648791257000,24,2,3,1.1);
sql insert into t1 values(1648791259000,25,2,3,1.1);
sql insert into t1 values(1648791261000,26,2,3,1.0);
sql insert into t1 values(1648791263000,27,2,3,1.0);
sql insert into t1 values(1648791265000,28,2,3,1.0);
sql insert into t1 values(1648791267000,29,2,3,1.0);
sql insert into t1 values(1648791269000,30,2,3,1.0);
$loop_count = 0
loop8:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop8
endi
sql insert into t1 values(1648791211001,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
sql insert into t1 values(1648791215001,3,2,3,1.1);
sql insert into t1 values(1648791217001,4,2,3,1.1);
sql insert into t1 values(1648791219001,5,2,3,1.1);
sql insert into t1 values(1648791221001,6,2,3,1.0);
sql insert into t1 values(1648791223001,7,2,3,1.0);
sql insert into t1 values(1648791225001,8,2,3,1.0);
sql insert into t1 values(1648791227001,9,2,3,1.0);
sql insert into t1 values(1648791229001,10,2,3,1.0);
$loop_count = 0
loop9:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop9
endi
if $data01 != 2 then
print =====data01=$data01
goto loop9
endi
if $data91 != 2 then
print =====data91=$data91
goto loop9
endi
sql insert into t1 values(1648791231001,11,2,3,1.0);
sql insert into t1 values(1648791233001,12,2,3,1.1);
sql insert into t1 values(1648791235001,13,2,3,1.1);
sql insert into t1 values(1648791237001,14,2,3,1.1);
sql insert into t1 values(1648791239001,15,2,3,1.1);
sql insert into t1 values(1648791241001,16,2,3,1.0);
sql insert into t1 values(1648791243001,17,2,3,1.0);
sql insert into t1 values(1648791245001,18,2,3,1.0);
sql insert into t1 values(1648791247001,19,2,3,1.0);
sql insert into t1 values(1648791249001,20,2,3,1.0);
$loop_count = 0
loop10:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop10
endi
if $data[10][1] != 2 then
print =====data[10][1]=$data[10][1]
goto loop10
endi
if $data[19][1] != 2 then
print =====data[19][1]=$data[19][1]
goto loop10
endi
sql insert into t1 values(1648791251001,21,2,3,1.0);
sql insert into t1 values(1648791253001,22,2,3,1.1);
sql insert into t1 values(1648791255001,23,2,3,1.1);
sql insert into t1 values(1648791257001,24,2,3,1.1);
#///////////////////////
$loop_count = 0
loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop11
endi
if $data[20][1] != 2 then
print =====[20][1]=$[20][1]
goto loop11
endi
#///////////////////////
sql insert into t1 values(1648791259001,25,2,3,1.1);
sql insert into t1 values(1648791261001,26,2,3,1.0);
sql insert into t1 values(1648791263001,27,2,3,1.0);
sql insert into t1 values(1648791265001,28,2,3,1.0);
sql insert into t1 values(1648791267001,29,2,3,1.0);
sql insert into t1 values(1648791269001,30,2,3,1.0);
$loop_count = 0
loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop11
endi
if $data[20][1] != 2 then
print =====[20][1]=$[20][1]
goto loop11
endi
if $data[29][1] != 2 then
print =====[29][1]=$[29][1]
goto loop11
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -132,85 +132,85 @@ sql select * from streamt order by s desc;
# row 0
if $data01 != 2 then
print ======$data01
print =====data01=$data01
goto loop2
endi
if $data02 != 29 then
print ======$data02
print =====data02=$data02
goto loop2
endi
if $data03 != 7 then
print ======$data03
print =====data03=$data03
goto loop2
endi
if $data04 != 22 then
print ======$data04
print =====data04=$data04
goto loop2
endi
# row 1
if $data11 != 3 then
print ======$data11
print =====data11=$data11
goto loop2
endi
if $data12 != 33 then
print ======$data12
print =====data12=$data12
goto loop2
endi
if $data13 != 8 then
print ======$data13
print =====data13=$data13
goto loop2
endi
if $data14 != 21 then
print ======$data14
print =====data14=$data14
goto loop2
endi
# row 2
if $data21 != 4 then
print ======$data21
print =====data21=$data21
goto loop2
endi
if $data22 != 25 then
print ======$data22
print =====data22=$data22
goto loop2
endi
if $data23 != 2 then
print ======$data23
print =====data23=$data23
goto loop2
endi
if $data24 != 20 then
print ======$data24
print =====data24=$data24
goto loop2
endi
# row 3
if $data31 != 10 then
print ======$data31
print =====data31=$data31
goto loop2
endi
if $data32 != 54 then
print ======$data32
print =====data32=$data32
goto loop2
endi
if $data33 != 1 then
print ======$data33
print =====data33=$data33
goto loop2
endi
if $data34 != 19 then
print ======$data34
print =====data34=$data34
goto loop2
endi