diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 86e4144687..f7c022924a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -133,6 +133,10 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal /***compare func **/ +typedef struct SStateChekpoint { + char* taskName; + int64_t checkpointId; +} SStateChekpoint; // todo refactor typedef struct SStateKey { SWinKey key; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index d9f0dd41a0..a06aef365f 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -85,4 +85,8 @@ void streamStateDestroyBatch(void* pBatch); int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); + +int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); +int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); +int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 9dc84db37c..e700e8b97a 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -16,6 +16,25 @@ #include "streamBackendRocksdb.h" #include "tcommon.h" #include "tlog.h" + +int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { + // + return memcmp(aBuf, bBuf, aLen); +} +int defaultKeyEncode(void* k, char* buf) { + int len = strlen((char*)k); + memcpy(buf, (char*)k, len); + return len; +} +int defaultKeyDecode(void* k, char* buf) { + int len = strlen(buf); + memcpy(k, buf, len); + return len; +} +int defaultKeyToString(void* k, char* buf) { + // just to debug + return sprintf(buf, "key: %s", (char*)k); +} // // SStateKey // |--groupid--|---ts------|--opNum----| @@ -261,38 +280,57 @@ int parKeyToString(void* k, char* buf) { return n; } -const char* cfName[] = {"default", "fill", "sess", "func", "parname", "partag"}; +const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; + typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*DecodeFunc)(void* key, char* buf); typedef int (*ToStringFunc)(void* key, char* buf); -////typedef int32_t (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); -////typedef const char* (*BackendCmpNameFunc)(void* statue); +typedef const char* (*CompareName)(void* statue); +typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +typedef void (*DestroyFunc)(void* state); + +const char* compareDefaultName(void* name); +const char* compareStateName(void* name); +const char* compareWinKeyName(void* name); +const char* compareSessionKeyName(void* name); +const char* compareFuncKeyName(void* name); +const char* compareParKeyName(void* name); +const char* comparePartagKeyName(void* name); typedef struct { - const char* key; - int32_t len; - int idx; - EncodeFunc enFunc; - DecodeFunc deFunc; - ToStringFunc toStrFunc; + const char* key; + int32_t len; + int idx; + BackendCmpFunc cmpFunc; + EncodeFunc enFunc; + DecodeFunc deFunc; + ToStringFunc toStrFunc; + CompareName cmpName; + DestroyFunc detroyFunc; + } SCfInit; SCfInit ginitDict[] = { - {"default", strlen("default"), 0, stateKeyEncode, stateKeyDecode, stateKeyToString}, - {"fill", strlen("fill"), 1, winKeyEncode, winKeyDecode, winKeyToString}, - {"sess", strlen("sess"), 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString}, - {"func", strlen("func"), 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString}, - {"parname", strlen("parname"), 4, parKeyEncode, parKeyDecode, parKeyToString}, - {"partag", strlen("partag"), 5, parKeyEncode, parKeyDecode, parKeyToString}, + {"default", strlen("default"), 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, + compareDefaultName}, + {"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName}, + {"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName}, + {"sess", strlen("sess"), 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, + stateSessionKeyToString, compareSessionKeyName}, + {"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName}, + {"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName}, + {"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName}, }; -const char* compareStateName(void* name) { return cfName[0]; } -const char* compareWinKeyName(void* name) { return cfName[1]; } -const char* compareSessionKey(void* name) { return cfName[2]; } -const char* compareFuncKey(void* name) { return cfName[3]; } -const char* compareParKey(void* name) { return cfName[4]; } -const char* comparePartagKey(void* name) { return cfName[5]; } -void destroyFunc(void* stata) { return; } +const char* compareDefaultName(void* name) { return ginitDict[0].key; } +const char* compareStateName(void* name) { return ginitDict[1].key; } +const char* compareWinKeyName(void* name) { return ginitDict[2].key; } +const char* compareSessionKeyName(void* name) { return ginitDict[3].key; } +const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } +const char* compareParKeyName(void* name) { return ginitDict[5].key; } +const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } + +void destroyFunc(void* stata) { return; } int streamInitBackend(SStreamState* pState, char* path) { rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); @@ -309,11 +347,12 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_write_buffer_size(opts, 128 << 20); char* err = NULL; - int cfLen = sizeof(cfName) / sizeof(cfName[0]); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { cfOpt[i] = rocksdb_options_create_copy(opts); + // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); rocksdb_block_based_options_set_block_cache(tableOpt, cache); @@ -328,31 +367,12 @@ int streamInitBackend(SStreamState* pState, char* path) { }; rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); - - rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, destroyFunc, stateKeyDBComp, compareStateName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare); - pCompare[0] = stateCompare; - - rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, destroyFunc, winKeyDBComp, compareWinKeyName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); - pCompare[1] = fillCompare; - - rocksdb_comparator_t* sessCompare = - rocksdb_comparator_create(NULL, destroyFunc, stateSessionKeyDBComp, compareSessionKey); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare); - pCompare[2] = sessCompare; - - rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, destroyFunc, tupleKeyDBComp, compareFuncKey); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare); - pCompare[3] = funcCompare; - - rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, compareParKey); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare); - pCompare[4] = parnameCompare; - - rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, comparePartagKey); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); - pCompare[5] = partagCompare; + for (int i = 0; i < cfLen; i++) { + SCfInit* cf = &ginitDict[i]; + rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); + pCompare[i] = compare; + } rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err); @@ -373,7 +393,7 @@ void streamCleanBackend(SStreamState* pState) { qInfo("rocksdb already free"); return; } - int cfLen = sizeof(cfName) / sizeof(cfName[0]); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); @@ -540,7 +560,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "default", &sKey, value, vLen); + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen); return code; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { @@ -581,17 +601,34 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen); return 0; } + +int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); + return code; +} + int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_GET_ROCKSDB(pState, "default", &sKey, pVal, pVLen); + STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); return code; } // todo refactor int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_DEL_ROCKSDB(pState, "default", &sKey); + STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey); return code; } @@ -638,7 +675,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { sKeyStr, sLen, eKeyStr, eLen, &err); // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); if (err != NULL) { - qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); + qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); taosMemoryFree(err); } @@ -653,7 +690,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { // streamStatePut_rocksdb(pState, &key, s, strlen(s)); // rocksdb_readoptions_t* opt = NULL; - // rocksdb_iterator_t* iter = streamStateIterCreate(pState, "default", NULL, &opt); + // rocksdb_iterator_t* iter = streamStateIterCreate(pState, "state", NULL, &opt); // rocksdb_iter_seek(iter, buf, sLen); // char* err = NULL; @@ -804,7 +841,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -973,7 +1010,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin } pCur->number = pState->number; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 34d4fa9d5f..dedf2b114f 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -57,11 +57,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } - pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2); + pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - int32_t cap = TMIN(10240, pFileState->maxRowCount); + int32_t cap = TMIN(10240, pFileState->maxRowCount); pFileState->rowBuffMap = tSimpleHashInit(cap, hashFn); if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { goto _error; @@ -342,12 +342,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); + code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); } if (streamStateGetBatchSize(batch) > 0) { code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } - streamStateDestroyBatch(batch); if (flushState) { int32_t len = 0; @@ -357,6 +356,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); taosMemoryFree(buff); } + + streamStateDestroyBatch(batch); return code; }