opt write snapshot
This commit is contained in:
commit
1e3569367a
|
@ -37,13 +37,14 @@ typedef SList SStreamSnapshot;
|
|||
|
||||
typedef TSKEY (*GetTsFun)(void*);
|
||||
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
|
||||
void streamFileStateDestroy(SStreamFileState* pFileState);
|
||||
void streamFileStateClear(SStreamFileState* pFileState);
|
||||
|
||||
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
|
||||
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
|
||||
void releaseRowBuffPos(SRowBuffPos* pBuff);
|
||||
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
|
||||
|
||||
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
|
||||
|
|
|
@ -828,7 +828,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||
}
|
||||
|
||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->i64;
|
||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
||||
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
|
||||
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
|
||||
|
||||
GRANT_CFG_GET;
|
||||
return 0;
|
||||
|
|
|
@ -2881,7 +2881,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
|
||||
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs,
|
||||
pInfo->pState, pInfo->twAggSup.deleteMark);
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
|
@ -5042,7 +5042,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->numOfDatapack = 0;
|
||||
pInfo->pUpdated = NULL;
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
|
||||
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs,
|
||||
pInfo->pState, pInfo->twAggSup.deleteMark);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
|
|
|
@ -319,7 +319,9 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
|
|||
}
|
||||
|
||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
||||
return getRowBuffByPos(pState->pFileState, pos, pVal);
|
||||
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
||||
releaseRowBuffPos(pos);
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "tsimplehash.h"
|
||||
|
||||
#define FLUSH_RATIO 0.2
|
||||
#define FLUSH_NUM 4
|
||||
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
|
||||
|
||||
struct SStreamFileState {
|
||||
|
@ -42,7 +43,8 @@ struct SStreamFileState {
|
|||
|
||||
typedef SRowBuffPos SRowBuffInfo;
|
||||
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) {
|
||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
|
||||
TSKEY delMark) {
|
||||
if (memSize <= 0) {
|
||||
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
|
||||
}
|
||||
|
@ -61,12 +63,13 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFu
|
|||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
|
||||
goto _error;
|
||||
}
|
||||
pFileState->keyLen = keySize;
|
||||
pFileState->rowSize = rowSize;
|
||||
pFileState->preCheckPointVersion = 0;
|
||||
pFileState->checkPointVersion = 1;
|
||||
pFileState->pFileStore = pFile;
|
||||
pFileState->getTs = fp;
|
||||
pFileState->maxRowCount = memSize / rowSize;
|
||||
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
|
||||
pFileState->curRowCount = 0;
|
||||
pFileState->deleteMark = delMark;
|
||||
pFileState->flushMark = -1;
|
||||
|
@ -89,7 +92,9 @@ void destroyRowBuffPosPtr(void* ptr) {
|
|||
return;
|
||||
}
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
|
||||
destroyRowBuffPos(pPos);
|
||||
if (!pPos->beUsed) {
|
||||
destroyRowBuffPos(pPos);
|
||||
}
|
||||
}
|
||||
|
||||
void destroyRowBuff(void* ptr) {
|
||||
|
@ -117,12 +122,13 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
|
|||
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
|
||||
if (all || (pFileState->getTs(pPos->pKey) < ts)) {
|
||||
tdListPopNode(pFileState->usedBuffs, pNode);
|
||||
taosMemoryFreeClear(pNode);
|
||||
ASSERT(pPos->pRowBuff != NULL);
|
||||
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
|
||||
pPos->pRowBuff = NULL;
|
||||
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
|
||||
destroyRowBuffPos(pPos);
|
||||
tdListPopNode(pFileState->usedBuffs, pNode);
|
||||
taosMemoryFreeClear(pNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,27 +138,47 @@ void streamFileStateClear(SStreamFileState* pFileState) {
|
|||
clearExpiredRowBuff(pFileState, 0, true);
|
||||
}
|
||||
|
||||
int32_t flushRowBuff(SStreamFileState* pFileState) {
|
||||
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
|
||||
if (!pFlushList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
||||
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
|
||||
uint64_t i = 0;
|
||||
SListIter iter = {0};
|
||||
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
|
||||
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&iter)) != NULL && i < num) {
|
||||
while ((pNode = tdListNext(&iter)) != NULL && i < max) {
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||
if (!pPos->beUsed) {
|
||||
if (pPos->beUsed == used) {
|
||||
tdListAppend(pFlushList, &pPos);
|
||||
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
|
||||
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
|
||||
tdListPopNode(pFileState->usedBuffs, pNode);
|
||||
taosMemoryFreeClear(pNode);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t flushRowBuff(SStreamFileState* pFileState) {
|
||||
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
|
||||
if (!pFlushList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
||||
num = TMAX(num, FLUSH_NUM);
|
||||
popUsedBuffs(pFileState, pFlushList, num, false);
|
||||
if (isListEmpty(pFlushList)) {
|
||||
popUsedBuffs(pFileState, pFlushList, num, true);
|
||||
}
|
||||
flushSnapshot(pFileState, pFlushList, false);
|
||||
SListIter fIter = {0};
|
||||
tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&fIter)) != NULL) {
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||
ASSERT(pPos->pRowBuff != NULL);
|
||||
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
|
||||
pPos->pRowBuff = NULL;
|
||||
}
|
||||
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -177,11 +203,11 @@ void* getFreeBuff(SList* lists, int32_t buffSize) {
|
|||
|
||||
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
|
||||
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
||||
tdListAppend(pFileState->usedBuffs, &pPos);
|
||||
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
|
||||
void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
|
||||
if (pBuff) {
|
||||
pPos->pRowBuff = pBuff;
|
||||
return pPos;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (pFileState->curRowCount < pFileState->maxRowCount) {
|
||||
|
@ -189,13 +215,17 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
|
|||
if (pBuff) {
|
||||
pPos->pRowBuff = pBuff;
|
||||
pFileState->curRowCount++;
|
||||
return pPos;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = clearRowBuff(pFileState);
|
||||
ASSERT(code == 0);
|
||||
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
|
||||
|
||||
_end:
|
||||
tdListAppend(pFileState->usedBuffs, &pPos);
|
||||
ASSERT(pPos->pRowBuff != NULL);
|
||||
return pPos;
|
||||
}
|
||||
|
||||
|
@ -203,23 +233,24 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
|
|||
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
|
||||
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
|
||||
if (pos) {
|
||||
if (pVal) {
|
||||
*pVLen = pFileState->rowSize;
|
||||
*pVal = *pos;
|
||||
}
|
||||
*pVLen = pFileState->rowSize;
|
||||
*pVal = *pos;
|
||||
(*pos)->beUsed = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
|
||||
ASSERT(pNewPos); // todo(liuyao) delete
|
||||
pNewPos->pKey = taosMemoryCalloc(1, keyLen);
|
||||
pNewPos->beUsed = true;
|
||||
ASSERT(pNewPos->pRowBuff);
|
||||
memcpy(pNewPos->pKey, pKey, keyLen);
|
||||
|
||||
TSKEY ts = pFileState->getTs(pKey);
|
||||
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
|
||||
int32_t len = 0;
|
||||
void* pVal = NULL;
|
||||
streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len);
|
||||
memcpy(pNewPos->pRowBuff, pVal, len);
|
||||
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
memcpy(pNewPos->pRowBuff, pVal, len);
|
||||
}
|
||||
taosMemoryFree(pVal);
|
||||
}
|
||||
|
||||
|
@ -243,15 +274,21 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = clearRowBuff(pFileState);
|
||||
ASSERT(code == 0);
|
||||
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
|
||||
if (!pPos->pRowBuff) {
|
||||
int32_t code = clearRowBuff(pFileState);
|
||||
ASSERT(code == 0);
|
||||
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
|
||||
ASSERT(pPos->pRowBuff);
|
||||
}
|
||||
|
||||
int32_t len = 0;
|
||||
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, pVal, &len);
|
||||
memcpy(pPos->pRowBuff, pVal, len);
|
||||
taosMemoryFree(pVal);
|
||||
void* pBuff = NULL;
|
||||
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len);
|
||||
memcpy(pPos->pRowBuff, pBuff, len);
|
||||
taosMemoryFree(pBuff);
|
||||
(*pVal) = pPos->pRowBuff;
|
||||
tdListPrepend(pFileState->usedBuffs, &pPos);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -292,6 +329,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
|||
void* batch = streamStateCreateBatch();
|
||||
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
|
||||
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
|
||||
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
|
||||
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||
streamStateClearBatch(batch);
|
||||
|
|
|
@ -845,6 +845,8 @@ sql create stream streams7 trigger at_once IGNORE EXPIRED 0 into streamt7 as sel
|
|||
sql insert into ts1 values(1648791211000,1,2,3);
|
||||
sql_error insert into ts1 values(-1648791211000,1,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop18:
|
||||
|
||||
sleep 200
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c debugflag -v 131
|
||||
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
sleep 10000
|
||||
|
||||
sql connect
|
||||
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s);
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
||||
sql insert into t1 values(1648791213002,3,2,3,2.1);
|
||||
sql insert into t1 values(1648791214003,4,2,3,3.1);
|
||||
sql insert into t1 values(1648791215003,4,2,3,3.1);
|
||||
sql insert into t1 values(1648791216004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791217004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791218004,4,2,3,4.1);
|
||||
|
||||
sql insert into t1 values(1648791221004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791222004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791223004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791224004,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791225005,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791226005,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791227005,4,2,3,4.1);
|
||||
sql insert into t1 values(1648791228005,4,2,3,4.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from streamt
|
||||
sql select * from streamt;
|
||||
|
||||
if $rows != 16 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791231004,4,2,3,4.1) (1648791232004,4,2,3,4.1) (1648791233004,4,2,3,4.1) (1648791234004,4,2,3,4.1) (1648791235004,4,2,3,4.1) (1648791236004,4,2,3,4.1) (1648791237004,4,2,3,4.1) (1648791238004,4,2,3,4.1) (1648791239004,4,2,3,4.1) (1648791240004,4,2,3,4.1) (1648791241004,4,2,3,4.1) (1648791242004,4,2,3,4.1) (1648791243004,4,2,3,4.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 select * from streamt
|
||||
sql select * from streamt;
|
||||
|
||||
if $rows != 29 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue