opt write snapshot
This commit is contained in:
parent
ad16487180
commit
c082536242
|
@ -54,9 +54,9 @@ typedef struct STdbState {
|
||||||
|
|
||||||
// incremental state storage
|
// incremental state storage
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STdbState* pTdbState;
|
STdbState* pTdbState;
|
||||||
SStreamFileState* pFileState;
|
SStreamFileState* pFileState;
|
||||||
int32_t number;
|
int32_t number;
|
||||||
} SStreamState;
|
} SStreamState;
|
||||||
|
|
||||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
|
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||||
|
@ -81,7 +81,7 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
|
||||||
|
|
||||||
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||||
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
|
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
||||||
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateClear(SStreamState* pState);
|
int32_t streamStateClear(SStreamState* pState);
|
||||||
|
|
|
@ -77,4 +77,12 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t grou
|
||||||
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
|
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
|
||||||
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
|
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
|
||||||
void streamStateDestroy_rocksdb(SStreamState* pState);
|
void streamStateDestroy_rocksdb(SStreamState* pState);
|
||||||
|
|
||||||
|
void* streamStateCreateBatch();
|
||||||
|
int32_t streamStateGetBatchSize(void* pBatch);
|
||||||
|
void streamStateClearBatch(void* pBatch);
|
||||||
|
void streamStateDestroyBatch(void* pBatch);
|
||||||
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
|
void* val, int32_t vlen);
|
||||||
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
|
||||||
#endif
|
#endif
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "rocksdb/c.h"
|
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
@ -544,6 +543,44 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen);
|
STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
|
char* err = NULL;
|
||||||
|
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
|
||||||
|
if (err != NULL) {
|
||||||
|
qError("streamState failed to write batch, err:%s", err);
|
||||||
|
taosMemoryFree(err);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* streamStateCreateBatch() {
|
||||||
|
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
|
||||||
|
return pBatch;
|
||||||
|
}
|
||||||
|
int32_t streamStateGetBatchSize(void* pBatch) {
|
||||||
|
if (pBatch == NULL) return -1;
|
||||||
|
|
||||||
|
return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
|
||||||
|
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
||||||
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
|
void* val, int32_t vlen) {
|
||||||
|
int i = streamGetInit(cfName);
|
||||||
|
|
||||||
|
if (i < 0) {
|
||||||
|
qError("streamState failed to put to cf name:%s", cfName);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
|
||||||
|
|
||||||
|
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
|
||||||
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
|
|
@ -15,13 +15,12 @@
|
||||||
|
|
||||||
#include "tstreamFileState.h"
|
#include "tstreamFileState.h"
|
||||||
|
|
||||||
|
#include "streamBackendRocksdb.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
#include "streamBackendRocksdb.h"
|
|
||||||
|
|
||||||
|
#define FLUSH_RATIO 0.2
|
||||||
#define FLUSH_RATIO 0.2
|
|
||||||
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
|
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
|
||||||
|
|
||||||
struct SStreamFileState {
|
struct SStreamFileState {
|
||||||
|
@ -117,7 +116,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
|
||||||
SListNode* pNode = NULL;
|
SListNode* pNode = NULL;
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||||
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
|
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
|
||||||
if (all || (pFileState->getTs(pPos->pKey) <ts) ) {
|
if (all || (pFileState->getTs(pPos->pKey) < ts)) {
|
||||||
tdListPopNode(pFileState->usedBuffs, pNode);
|
tdListPopNode(pFileState->usedBuffs, pNode);
|
||||||
taosMemoryFreeClear(pNode);
|
taosMemoryFreeClear(pNode);
|
||||||
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
|
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
|
||||||
|
@ -138,8 +137,8 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
|
||||||
if (!pFlushList) {
|
if (!pFlushList) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
||||||
uint64_t i = 0;
|
uint64_t i = 0;
|
||||||
SListIter iter = {0};
|
SListIter iter = {0};
|
||||||
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
|
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
|
@ -211,14 +210,14 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
|
||||||
ASSERT(pNewPos);// todo(liuyao) delete
|
ASSERT(pNewPos); // todo(liuyao) delete
|
||||||
pNewPos->pKey = taosMemoryCalloc(1, keyLen);
|
pNewPos->pKey = taosMemoryCalloc(1, keyLen);
|
||||||
memcpy(pNewPos->pKey, pKey, keyLen);
|
memcpy(pNewPos->pKey, pKey, keyLen);
|
||||||
|
|
||||||
TSKEY ts = pFileState->getTs(pKey);
|
TSKEY ts = pFileState->getTs(pKey);
|
||||||
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
|
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void *pVal = NULL;
|
void* pVal = NULL;
|
||||||
streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len);
|
streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len);
|
||||||
memcpy(pNewPos->pRowBuff, pVal, len);
|
memcpy(pNewPos->pRowBuff, pVal, len);
|
||||||
taosMemoryFree(pVal);
|
taosMemoryFree(pVal);
|
||||||
|
@ -264,9 +263,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void releaseRowBuffPos(SRowBuffPos* pBuff) {
|
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
|
||||||
pBuff->beUsed = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
||||||
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
|
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
|
||||||
|
@ -280,25 +277,38 @@ void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t le
|
||||||
void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
|
void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
|
||||||
*pLen = sizeof(TSKEY);
|
*pLen = sizeof(TSKEY);
|
||||||
(*pVal) = taosMemoryCalloc(1, *pLen);
|
(*pVal) = taosMemoryCalloc(1, *pLen);
|
||||||
void* buff = *pVal;
|
void* buff = *pVal;
|
||||||
taosEncodeFixedI64(&buff, pFileState->flushMark);
|
taosEncodeFixedI64(&buff, pFileState->flushMark);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
|
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SListIter iter = {0};
|
SListIter iter = {0};
|
||||||
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
|
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
SListNode* pNode = NULL;
|
const int32_t BATCH_LIMIT = 128;
|
||||||
|
SListNode* pNode = NULL;
|
||||||
|
|
||||||
|
void* batch = streamStateCreateBatch();
|
||||||
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
|
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
|
||||||
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||||
code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
|
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
|
||||||
|
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
|
streamStateClearBatch(batch);
|
||||||
|
}
|
||||||
|
code =
|
||||||
|
streamStatePutBatch(pFileState->pFileStore, "default", batch, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
|
||||||
}
|
}
|
||||||
|
if (streamStateGetBatchSize(batch) > 0) {
|
||||||
|
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
|
}
|
||||||
|
streamStateDestroyBatch(batch);
|
||||||
|
|
||||||
if (flushState) {
|
if (flushState) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
streamFileStateEncode(pFileState, &buff, &len);
|
streamFileStateEncode(pFileState, &buff, &len);
|
||||||
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao
|
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao
|
||||||
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len);
|
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
|
@ -307,8 +317,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
|
|
||||||
int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao
|
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao
|
||||||
void* pStVal = NULL;
|
void* pStVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len);
|
code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -317,7 +327,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWinKey key = {.groupId = 0, .ts = 0};
|
SWinKey key = {.groupId = 0, .ts = 0};
|
||||||
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
|
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
|
||||||
if (!pCur) {
|
if (!pCur) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -330,10 +340,10 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||||
if (pFileState->curRowCount == pFileState->maxRowCount) {
|
if (pFileState->curRowCount == pFileState->maxRowCount) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t pVLen = 0;
|
int32_t pVLen = 0;
|
||||||
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
|
||||||
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void **)&pVal, &pVLen);
|
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
|
||||||
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
||||||
destroyRowBuffPos(pNewPos);
|
destroyRowBuffPos(pNewPos);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue