diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 4a63cfeb5f..62b555d437 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -83,7 +83,7 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -bool streamStateCheck(SStreamState* pState, const SWinKey* key); +bool streamStateCheck(SStreamState* pState, const SWinKey* key); int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 04d8093fa7..d9f0dd41a0 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -77,4 +77,12 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t grou int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); void streamStateDestroy_rocksdb(SStreamState* pState); + +void* streamStateCreateBatch(); +int32_t streamStateGetBatchSize(void* pBatch); +void streamStateClearBatch(void* pBatch); +void streamStateDestroyBatch(void* pBatch); +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen); +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 281c5b8ded..9dc84db37c 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" #include "tlog.h" @@ -544,6 +543,44 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen); return code; } +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { + char* err = NULL; + rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + if (err != NULL) { + qError("streamState failed to write batch, err:%s", err); + taosMemoryFree(err); + return -1; + } + return 0; +} + +void* streamStateCreateBatch() { + rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); + return pBatch; +} +int32_t streamStateGetBatchSize(void* pBatch) { + if (pBatch == NULL) return -1; + + return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch); +} + +void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } +void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen) { + int i = streamGetInit(cfName); + + if (i < 0) { + qError("streamState failed to put to cf name:%s", cfName); + return -1; + } + char buf[128] = {0}; + int32_t klen = ginitDict[i].enFunc((void*)key, buf); + + rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; + rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen); + return 0; +} int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 9949f67321..1ba862e661 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -15,14 +15,13 @@ #include "tstreamFileState.h" +#include "streamBackendRocksdb.h" #include "taos.h" #include "thash.h" #include "tsimplehash.h" -#include "streamBackendRocksdb.h" - -#define FLUSH_RATIO 0.2 -#define FLUSH_NUM 4 +#define FLUSH_RATIO 0.2 +#define FLUSH_NUM 4 #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024); struct SStreamFileState { @@ -44,7 +43,8 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) { +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, + TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; } @@ -71,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; + pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = -1; @@ -122,7 +123,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); - if (all || (pFileState->getTs(pPos->pKey) < ts) ) { + if (all || (pFileState->getTs(pPos->pKey) < ts)) { ASSERT(pPos->pRowBuff != NULL); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; @@ -142,7 +143,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { } void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { - uint64_t i = 0; + uint64_t i = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -250,7 +251,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi TSKEY ts = pFileState->getTs(pKey); if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) { int32_t len = 0; - void *pVal = NULL; + void* pVal = NULL; int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len); if (code == TSDB_CODE_SUCCESS) { memcpy(pNewPos->pRowBuff, pVal, len); @@ -287,7 +288,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** } int32_t len = 0; - void *pBuff = NULL; + void* pBuff = NULL; streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len); memcpy(pPos->pRowBuff, pBuff, len); taosMemoryFree(pBuff); @@ -304,9 +305,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { return false; } -void releaseRowBuffPos(SRowBuffPos* pBuff) { - pBuff->beUsed = false; -} +void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; } SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); @@ -320,26 +319,39 @@ void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t le void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); - void* buff = *pVal; + void* buff = *pVal; taosEncodeFixedI64(&buff, pFileState->flushMark); } int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SListIter iter = {0}; tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); - SListNode* pNode = NULL; + const int32_t BATCH_LIMIT = 128; + SListNode* pNode = NULL; + + void* batch = streamStateCreateBatch(); while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); - code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize); + if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { + code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); + streamStateClearBatch(batch); + } + code = + streamStatePutBatch(pFileState->pFileStore, "default", batch, pPos->pKey, pPos->pRowBuff, pFileState->rowSize); } + if (streamStateGetBatchSize(batch) > 0) { + code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); + } + streamStateDestroyBatch(batch); + if (flushState) { int32_t len = 0; - void* buff = NULL; + void* buff = NULL; streamFileStateEncode(pFileState, &buff, &len); - SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao + SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); taosMemoryFree(buff); } @@ -348,8 +360,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao - void* pStVal = NULL; + SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao + void* pStVal = NULL; int32_t len = 0; code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len); if (code == TSDB_CODE_SUCCESS) { @@ -358,7 +370,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { return TSDB_CODE_FAILED; } - SWinKey key = {.groupId = 0, .ts = 0}; + SWinKey key = {.groupId = 0, .ts = 0}; SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); if (!pCur) { return TSDB_CODE_FAILED; @@ -371,10 +383,10 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { if (pFileState->curRowCount == pFileState->maxRowCount) { break; } - void* pVal = NULL; - int32_t pVLen = 0; + void* pVal = NULL; + int32_t pVLen = 0; SRowBuffPos* pNewPos = getNewRowPos(pFileState); - code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void **)&pVal, &pVLen); + code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen); if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); break;