add cvt func

This commit is contained in:
yihaoDeng 2023-09-21 09:41:30 +00:00
parent ee8c09667e
commit 470849982e
5 changed files with 270 additions and 32 deletions

View File

@ -31,7 +31,10 @@ extern "C" {
typedef struct SStreamTask SStreamTask;
#define SSTREAM_TASK_VER 2
#define SSTREAM_TASK_VER 3
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
@ -117,7 +120,7 @@ typedef struct {
typedef struct {
int8_t type;
int64_t ver;
SArray* submits; // SArray<SPackedSubmit>
SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit;
typedef struct {
@ -257,8 +260,9 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t msgVer;
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -283,8 +287,8 @@ typedef struct SSTaskBasicInfo {
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
int8_t fillHistory; // is fill history task or not
int64_t triggerParam; // in msec
int8_t fillHistory; // is fill history task or not
int64_t triggerParam; // in msec
} SSTaskBasicInfo;
typedef struct SDispatchMsgInfo {
@ -301,13 +305,13 @@ typedef struct STaskOutputInfo {
} STaskOutputInfo;
typedef struct STaskInputInfo {
int8_t status;
int8_t status;
SStreamQueue* queue;
} STaskInputInfo;
typedef struct STaskSchedInfo {
int8_t status;
void* pTimer;
int8_t status;
void* pTimer;
} STaskSchedInfo;
typedef struct SSinkTaskRecorder {
@ -325,10 +329,10 @@ typedef struct {
} STaskTimestamp;
typedef struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
int32_t capacity; // total capacity
int64_t fillTimestamp; // fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
} STokenBucket;
struct SStreamTask {
@ -650,7 +654,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);

View File

@ -42,8 +42,31 @@ typedef struct {
TdThreadMutex cfMutex;
SHashObj* cfInst;
int64_t defaultCfInit;
} SBackendWrapper;
typedef struct {
void* tableOpt;
} RocksdbCfParam;
typedef struct {
rocksdb_t* db;
rocksdb_writeoptions_t* writeOpts;
rocksdb_readoptions_t* readOpts;
rocksdb_options_t* dbOpt;
rocksdb_env_t* env;
rocksdb_cache_t* cache;
rocksdb_column_family_handle_t** pCf;
rocksdb_comparator_t** pCompares;
rocksdb_options_t** pCfOpts;
RocksdbCfParam* pCfParams;
rocksdb_compactionfilterfactory_t* filterFactory;
TdThreadMutex cfMutex;
} STaskBackendWrapper;
void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
@ -51,6 +74,7 @@ int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);

View File

@ -45,9 +45,6 @@ typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
typedef struct {
void* tableOpt;
} RocksdbCfParam;
typedef struct {
rocksdb_t* db;
rocksdb_column_family_handle_t** pHandle;
@ -1455,6 +1452,181 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
taosMemoryFree(inst);
}
STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
int32_t code = 0;
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper));
rocksdb_env_t* env = rocksdb_create_default_env();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(256);
rocksdb_options_t* opts = rocksdb_options_create();
// rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 1);
// rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
// rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
rocksdb_options_set_atomic_flush(opts, 1);
pTaskBackend->env = env;
pTaskBackend->dbOpt = opts;
pTaskBackend->cache = cache;
pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory);
char* err = NULL;
size_t nCf = 0;
char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err);
pTaskBackend->db = rocksdb_open(opts, taskPath, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err);
taosMemoryFreeClear(err);
}
} else {
ASSERT(0);
}
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
pTaskBackend->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
for (int i = 0; i < nCf; i++) {
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskBackend->dbOpt);
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, pTaskBackend->cache);
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
SCfInit* cfPara = &ginitDict[i];
rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
pTaskBackend->pCompares[i] = compare;
pTaskBackend->pCfOpts[i] = opt;
pTaskBackend->pCfParams[i].tableOpt = tableOpt;
}
return pTaskBackend;
}
int8_t getCfIdx(const char* key) {
int idx = -1;
size_t len = strlen(key);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (len == ginitDict[i].len && strncmp(key, ginitDict[i].key, strlen(key)) == 0) {
idx = i;
break;
}
}
return idx;
}
rocksdb_column_family_handle_t* taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) {
char* err = NULL;
int8_t idx = getCfIdx(key);
if (idx == -1) return NULL;
rocksdb_column_family_handle_t* cf =
rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err);
if (err != NULL) {
qError("failed to open cf, key:%s, reason: %s", key, err);
taosMemoryFree(err);
return NULL;
}
return cf;
}
int32_t copyData(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
int32_t WRITE_BATCH = 1024;
char* err = NULL;
int code = 0;
rocksdb_readoptions_t* pRdOpt = rocksdb_readoptions_create();
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
rocksdb_iterator_t* pIter = rocksdb_create_iterator_cf(pSrc->db, pRdOpt, pSrc->pHandle[i]);
rocksdb_iter_seek_to_first(pIter);
while (rocksdb_iter_valid(pIter)) {
if (rocksdb_writebatch_count(wb) >= WRITE_BATCH) {
rocksdb_write(pDst->db, pDst->writeOpts, wb, &err);
if (err != NULL) {
code = -1;
goto _EXIT;
}
rocksdb_writebatch_clear(wb);
}
size_t klen = 0, vlen = 0;
char* key = (char*)rocksdb_iter_key(pIter, &klen);
char* val = (char*)rocksdb_iter_value(pIter, &vlen);
rocksdb_writebatch_put_cf(wb, pDst->pCf[i], key, klen, val, vlen);
rocksdb_iter_next(pIter);
}
if (rocksdb_writebatch_count(wb) > 0) {
rocksdb_write(pDst->db, pDst->writeOpts, wb, &err);
if (err != NULL) {
code = -1;
goto _EXIT;
}
}
_EXIT:
rocksdb_iter_destroy(pIter);
rocksdb_readoptions_destroy(pRdOpt);
taosMemoryFree(err);
return code;
}
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst) {
int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
int32_t code = 0;
RocksdbCfInst* CfInst = cfInst;
STaskBackendWrapper* pTaskBackend = streamStateOpenTaskBackend(path, key);
RocksdbCfInst* pSrcBackend = cfInst;
for (int i = 0; i < nCf; i++) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
if (pSrcCf == NULL) continue;
rocksdb_column_family_handle_t* pDstCf = taskBackendOpenCf(pTaskBackend, ginitDict[i].key);
if (pDstCf == NULL) {
return -1;
}
code = copyData(pSrcBackend, pTaskBackend, i);
if (code != 0) {
return -1;
}
pTaskBackend->pCf[i] = pDstCf;
}
return 0;
}
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
SBackendWrapper* handle = backend;
char* err = NULL;

View File

@ -140,39 +140,78 @@ enum STREAM_STATE_VER {
STREAM_STATA_NEED_CONVERT,
};
int32_t streamMetaCheckStateVer(SStreamMeta* pMeta) {
TBC* pCur = NULL;
int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) {
int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
// no task info, no stream
return STREAM_STATA_COMPATIBLE;
return ret;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
}
SDecoder decoder;
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue;
}
if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
ret = STREAM_STATA_NO_COMPATIBLE;
} else if (info.msgVer == SSTREAM_TASK_NEED_CONVERT_VER) {
ret = STREAM_STATA_NEED_CONVERT;
} else if (info.msgVer == SSTREAM_TASK_VER) {
ret = STREAM_STATA_COMPATIBLE;
}
tDecoderClear(&decoder);
break;
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return STREAM_STATA_NEED_CONVERT;
return ret;
}
int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { return 0; }
int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) {
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
void* pIter = taosHashIterate(pBackend->cfInst, NULL);
while (pIter) {
size_t len = 0;
void* key = taosHashGetKey(pIter, &len);
pIter = taosHashIterate(pBackend->cfInst, pIter);
}
// streamBackendCleanup();
return 0;
}
int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckStateCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) {
return 0;
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
return -1;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qError("stream state need covert backend format");
return streamMetaDoStateDataConvertImpl(pMeta);
}
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));

View File

@ -226,9 +226,8 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->msgVer) < 0) return -1;
// if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;