diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3eb624f932..29dc054d78 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,10 @@ extern "C" { typedef struct SStreamTask SStreamTask; -#define SSTREAM_TASK_VER 2 +#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_INCOMPATIBLE_VER 1 +#define SSTREAM_TASK_NEED_CONVERT_VER 2 + enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -117,7 +120,7 @@ typedef struct { typedef struct { int8_t type; int64_t ver; - SArray* submits; // SArray + SArray* submits; // SArray } SStreamMergedSubmit; typedef struct { @@ -257,8 +260,9 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t checkpointId; - int64_t checkpointVer; // latest checkpointId version - int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t checkpointVer; // latest checkpointId version + int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t msgVer; } SCheckpointInfo; typedef struct SStreamStatus { @@ -283,8 +287,8 @@ typedef struct SSTaskBasicInfo { int32_t selfChildId; int32_t totalLevel; int8_t taskLevel; - int8_t fillHistory; // is fill history task or not - int64_t triggerParam; // in msec + int8_t fillHistory; // is fill history task or not + int64_t triggerParam; // in msec } SSTaskBasicInfo; typedef struct SDispatchMsgInfo { @@ -301,13 +305,13 @@ typedef struct STaskOutputInfo { } STaskOutputInfo; typedef struct STaskInputInfo { - int8_t status; + int8_t status; SStreamQueue* queue; } STaskInputInfo; typedef struct STaskSchedInfo { - int8_t status; - void* pTimer; + int8_t status; + void* pTimer; } STaskSchedInfo; typedef struct SSinkTaskRecorder { @@ -325,10 +329,10 @@ typedef struct { } STaskTimestamp; typedef struct STokenBucket { - int32_t capacity; // total capacity - int64_t fillTimestamp;// fill timestamp - int32_t numOfToken; // total available tokens - int32_t rate; // number of token per second + int32_t capacity; // total capacity + int64_t fillTimestamp; // fill timestamp + int32_t numOfToken; // total available tokens + int32_t rate; // number of token per second } STokenBucket; struct SStreamTask { @@ -650,7 +654,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 39854d1824..5bcc29287d 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -42,8 +42,31 @@ typedef struct { TdThreadMutex cfMutex; SHashObj* cfInst; int64_t defaultCfInit; + } SBackendWrapper; +typedef struct { + void* tableOpt; +} RocksdbCfParam; + +typedef struct { + rocksdb_t* db; + rocksdb_writeoptions_t* writeOpts; + rocksdb_readoptions_t* readOpts; + rocksdb_options_t* dbOpt; + rocksdb_env_t* env; + rocksdb_cache_t* cache; + + rocksdb_column_family_handle_t** pCf; + rocksdb_comparator_t** pCompares; + rocksdb_options_t** pCfOpts; + RocksdbCfParam* pCfParams; + + rocksdb_compactionfilterfactory_t* filterFactory; + TdThreadMutex cfMutex; + +} STaskBackendWrapper; + void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); @@ -51,6 +74,7 @@ int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); +int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 19f4ebbeea..7feb029e81 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -45,9 +45,6 @@ typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; -typedef struct { - void* tableOpt; -} RocksdbCfParam; typedef struct { rocksdb_t* db; rocksdb_column_family_handle_t** pHandle; @@ -1455,6 +1452,181 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { taosMemoryFree(inst); } +STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { + int32_t code = 0; + + char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); + sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); + + STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper)); + rocksdb_env_t* env = rocksdb_create_default_env(); + + rocksdb_cache_t* cache = rocksdb_cache_create_lru(256); + rocksdb_options_t* opts = rocksdb_options_create(); + // rocksdb_options_set_env(opts, env); + rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_missing_column_families(opts, 1); + // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); + rocksdb_options_set_recycle_log_file_num(opts, 6); + rocksdb_options_set_max_write_buffer_number(opts, 3); + rocksdb_options_set_info_log_level(opts, 1); + // rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); + // rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); + rocksdb_options_set_atomic_flush(opts, 1); + + pTaskBackend->env = env; + pTaskBackend->dbOpt = opts; + pTaskBackend->cache = cache; + pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create( + NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); + rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory); + + char* err = NULL; + size_t nCf = 0; + + char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err); + if (nCf == 0 || nCf == 1 || err != NULL) { + taosMemoryFreeClear(err); + pTaskBackend->db = rocksdb_open(opts, taskPath, &err); + if (err != NULL) { + qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err); + taosMemoryFreeClear(err); + } + } else { + ASSERT(0); + } + if (cfs != NULL) { + rocksdb_list_column_families_destroy(cfs, nCf); + } + qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); + + nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); + pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); + pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); + pTaskBackend->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); + + for (int i = 0; i < nCf; i++) { + rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskBackend->dbOpt); + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, pTaskBackend->cache); + rocksdb_block_based_options_set_partition_filters(tableOpt, 1); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt); + + SCfInit* cfPara = &ginitDict[i]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); + + pTaskBackend->pCompares[i] = compare; + pTaskBackend->pCfOpts[i] = opt; + pTaskBackend->pCfParams[i].tableOpt = tableOpt; + } + + return pTaskBackend; +} + +int8_t getCfIdx(const char* key) { + int idx = -1; + size_t len = strlen(key); + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (len == ginitDict[i].len && strncmp(key, ginitDict[i].key, strlen(key)) == 0) { + idx = i; + break; + } + } + return idx; +} +rocksdb_column_family_handle_t* taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) { + char* err = NULL; + int8_t idx = getCfIdx(key); + if (idx == -1) return NULL; + + rocksdb_column_family_handle_t* cf = + rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err); + if (err != NULL) { + qError("failed to open cf, key:%s, reason: %s", key, err); + taosMemoryFree(err); + return NULL; + } + return cf; +} +int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { + int32_t WRITE_BATCH = 1024; + char* err = NULL; + int code = 0; + + rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create(); + + rocksdb_writebatch_t* wb = rocksdb_writebatch_create(); + rocksdb_iterator_t* pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]); + rocksdb_iter_seek_to_first(pIter); + while (rocksdb_iter_valid(pIter)) { + if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) { + rocksdb_write(pDst->db, pDst->writeOpts, wb, &err); + if (err != NULL) { + code = -1; + goto _EXIT; + } + rocksdb_writebatch_clear(wb); + } + + size_t klen = 0, vlen = 0; + char* key = (char*)rocksdb_iter_key(pIter, &klen); + char* val = (char*)rocksdb_iter_value(pIter, &vlen); + + rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen); + rocksdb_iter_next(pIter); + } + + if (rocksdb_writebatch_count(wb) > 0) { + rocksdb_write(pDst->db, pDst->writeOpts, wb, &err); + if (err != NULL) { + code = -1; + goto _EXIT; + } + } + +_EXIT: + rocksdb_iter_destroy(pIter); + rocksdb_readoptions_destroy(pRdOpt); + taosMemoryFree(err); + + return code; +} + +int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst) { + int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); + + int32_t code = 0; + RocksdbCfInst* CfInst = cfInst; + + STaskBackendWrapper* pTaskBackend = streamStateOpenTaskBackend(path, key); + RocksdbCfInst* pSrcBackend = cfInst; + + for (int i = 0; i < nCf; i++) { + rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; + if (pSrcCf == NULL) continue; + + rocksdb_column_family_handle_t* pDstCf = taskBackendOpenCf(pTaskBackend, ginitDict[i].key); + if (pDstCf == NULL) { + return -1; + } + + code = copyData(pSrcBackend, pTaskBackend, i); + if (code != 0) { + return -1; + } + + pTaskBackend->pCf[i] = pDstCf; + } + return 0; +} int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { SBackendWrapper* handle = backend; char* err = NULL; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5c64619ca9..f47961d026 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -140,39 +140,78 @@ enum STREAM_STATE_VER { STREAM_STATA_NEED_CONVERT, }; -int32_t streamMetaCheckStateVer(SStreamMeta* pMeta) { - TBC* pCur = NULL; +int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) { + int8_t ret = STREAM_STATA_COMPATIBLE; + TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream - return STREAM_STATA_COMPATIBLE; + return ret; } - - void* pKey = NULL; - int32_t kLen = 0; - void* pVal = NULL; - int32_t vLen = 0; - SDecoder decoder; + void* pKey = NULL; + int32_t kLen = 0; + void* pVal = NULL; + int32_t vLen = 0; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { if (pVal == NULL || vLen == 0) { break; } + SDecoder decoder; SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { continue; } + if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { + ret = STREAM_STATA_NO_COMPATIBLE; + } else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) { + ret = STREAM_STATA_NEED_CONVERT; + } else if (info.msgVer == SSTREAM_TASK_VER) { + ret = STREAM_STATA_COMPATIBLE; + } tDecoderClear(&decoder); + break; } tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - - return STREAM_STATA_NEED_CONVERT; + return ret; } -int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { return 0; } +int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) { + int64_t chkpId = streamGetLatestCheckpointId(pMeta); + SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId); + + void* pIter = taosHashIterate(pBackend->cfInst, NULL); + while (pIter) { + size_t len = 0; + void* key = taosHashGetKey(pIter, &len); + + pIter = taosHashIterate(pBackend->cfInst, pIter); + } + + // streamBackendCleanup(); + + return 0; +} +int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { + int8_t compatible = streamMetaCheckStateCompatible(pMeta); + if (compatible == STREAM_STATA_COMPATIBLE) { + return 0; + } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { + qError( + "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " + "manually", + tsDataDir); + return -1; + } else if (compatible == STREAM_STATA_NEED_CONVERT) { + qError("stream state need covert backend format"); + return streamMetaDoStateDataConvertImpl(pMeta); + } + return 0; +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 663deca171..837f787dd2 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -226,9 +226,8 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) SEpSet epSet; if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &ver) < 0) return -1; - - if (ver != SSTREAM_TASK_VER) return -1; + if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1; + // if (ver != SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &skip64) < 0) return -1; if (tDecodeI32(pDecoder, &skip32) < 0) return -1;