refact task backend

This commit is contained in:
yihaoDeng 2023-09-28 15:46:12 +08:00
parent d4f87378fd
commit 54e3ac2c1e
7 changed files with 136 additions and 72 deletions

View File

@ -383,6 +383,7 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
int64_t backendRefId;
char reserve[256];
};

View File

@ -736,12 +736,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
if (code != TSDB_CODE_SUCCESS) return code;
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pTask->pBackend == NULL) return -1;
streamTaskOpenAllUpstreamInput(pTask);

View File

@ -65,6 +65,8 @@ typedef struct {
rocksdb_compactionfilterfactory_t* filterFactory;
TdThreadMutex mutex;
int64_t refId;
} STaskBackendWrapper;
void* streamBackendInit(const char* path, int64_t chkpId);
@ -75,7 +77,9 @@ int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointI
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key);
STaskBackendWrapper* taskBackendOpen(char* path, char* key);
void taskBackendAddRef(void* pTaskBackend);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);

View File

@ -629,7 +629,7 @@ void streamBackendHandleCleanup(void* arg) {
taosThreadRwlockDestroy(&wrapper->rwLock);
wrapper->rocksdb = NULL;
taosReleaseRef(streamBackendId, wrapper->backendId);
// taosReleaseRef(streamBackendId, wrapper->backendId);
qDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
taosMemoryFree(wrapper);
@ -776,7 +776,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
int64_t id = *(int64_t*)pIter;
SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
if (wrapper == NULL) continue;
if (wrapper == NULL) {
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
continue;
}
taosThreadRwlockRdlock(&wrapper->rwLock);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
@ -788,7 +791,6 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
taosThreadRwlockUnlock(&wrapper->rwLock);
taosArrayPush(refs, &id);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
int32_t nCf = taosArrayGetSize(pHandle);
@ -1452,35 +1454,72 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
taosMemoryFree(inst);
}
STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
int32_t code = 0;
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
if (!taosDirExist(taskPath)) {
code = taosMkDir(taskPath);
if (code != 0) {
qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code));
taosMemoryFree(taskPath);
return NULL;
int32_t getCfIdx(const char* cfName) {
int idx = -1;
size_t len = strlen(cfName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) {
idx = i;
break;
}
}
return idx;
}
int32_t taskBackendOpenCfs(STaskBackendWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
int32_t code = -1;
char* err = NULL;
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper));
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
int32_t idx = getCfIdx(pCfNames[i]);
cfOpts[i] = pTask->pCfOpts[idx];
}
rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames,
(const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
if (err != NULL) {
qError("failed to open cf path: %s", err);
taosMemoryFree(err);
goto _EXIT;
}
for (int i = 0; i < nCf; i++) {
int32_t idx = getCfIdx(pCfNames[i]);
pTask->pCf[idx] = cfHandle[i];
}
pTask->db = db;
code = 0;
_EXIT:
taosMemoryFree(cfOpts);
taosMemoryFree(cfHandle);
return code;
}
void taskBackendAddRef(void* pTaskBackend) {
STaskBackendWrapper* pBackend = pTaskBackend;
taosAcquireRef(streamBackendCfWrapperId, pBackend->refId);
return;
}
void taskBackendDestroy(STaskBackendWrapper* wrapper);
void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) {
rocksdb_env_t* env = rocksdb_create_default_env();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(256);
rocksdb_options_t* opts = rocksdb_options_create();
// rocksdb_options_set_env(opts, env);
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 1);
// rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
// rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
rocksdb_options_set_db_write_buffer_size(opts, 64 << 20);
rocksdb_options_set_write_buffer_size(opts, 32 << 20);
rocksdb_options_set_atomic_flush(opts, 1);
pTaskBackend->dbOpt = opts;
@ -1519,30 +1558,68 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) {
pTaskBackend->pCfOpts[i] = opt;
pTaskBackend->pCfParams[i].tableOpt = tableOpt;
}
return;
}
int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) {
int32_t code = 0;
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
if (!taosDirExist(taskPath)) {
code = taosMkDir(taskPath);
if (code != 0) {
qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code));
taosMemoryFree(taskPath);
return code;
}
}
*fullPath = taskPath;
return 0;
}
STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
char* taskPath = NULL;
char* err = NULL;
char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err);
int32_t code = taskBackendBuildFullPath(path, key, &taskPath);
if (code != 0) return NULL;
size_t nCf = 0;
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
taskBackendInitOpt(pTaskBackend);
char** cfs = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err);
pTaskBackend->db = rocksdb_open(opts, taskPath, &err);
pTaskBackend->db = rocksdb_open(pTaskBackend->dbOpt, taskPath, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err);
taosMemoryFreeClear(err);
code = -1;
goto _EXIT;
}
} else {
code = taskBackendOpenCfs(pTaskBackend, taskPath, cfs, nCf);
if (code != 0) goto _EXIT;
}
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
if (cfs != NULL) rocksdb_list_column_families_destroy(cfs, nCf);
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
taosMemoryFree(taskPath);
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
pTaskBackend->refId = taosAddRef(streamBackendCfWrapperId, pTaskBackend);
return pTaskBackend;
_EXIT:
taskBackendDestroy(pTaskBackend);
return NULL;
}
void streamStateCloseTaskBackend(STaskBackendWrapper* wrapper) {
void taskBackendDestroy(STaskBackendWrapper* wrapper) {
if (wrapper == NULL) return;
if (wrapper->db && wrapper->pCf) {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
@ -1554,13 +1631,15 @@ void streamStateCloseTaskBackend(STaskBackendWrapper* wrapper) {
qError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
rocksdb_flushoptions_destroy(flushOpt);
}
rocksdb_flushoptions_destroy(flushOpt);
for (int i = 0; i < nCf; i++) {
if (wrapper->pCf[i] != NULL) {
rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
}
}
}
rocksdb_options_destroy(wrapper->dbOpt);
rocksdb_readoptions_destroy(wrapper->readOpt);
rocksdb_writeoptions_destroy(wrapper->writeOpt);
@ -1574,24 +1653,13 @@ void streamStateCloseTaskBackend(STaskBackendWrapper* wrapper) {
taosThreadMutexDestroy(&wrapper->mutex);
rocksdb_close(wrapper->db);
if (wrapper->db) rocksdb_close(wrapper->db);
taosMemoryFree(wrapper);
return;
}
int8_t getCfIdx(const char* key) {
int idx = -1;
size_t len = strlen(key);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (len == ginitDict[i].len && strncmp(key, ginitDict[i].key, strlen(key)) == 0) {
idx = i;
break;
}
}
return idx;
}
int32_t taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) {
int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) {
int32_t code = 0;
char* err = NULL;
int8_t idx = getCfIdx(key);
@ -1658,14 +1726,14 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0;
STaskBackendWrapper* pTaskBackend = streamStateOpenTaskBackend(path, key);
STaskBackendWrapper* pTaskBackend = taskBackendOpen(path, key);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
if (pSrcCf == NULL) continue;
code = taskBackendOpenCf(pTaskBackend, ginitDict[i].key);
code = taskBackendOpenCfByKey(pTaskBackend, ginitDict[i].key);
if (code != 0) goto _EXIT;
code = copyDataAt(pSrcBackend, pTaskBackend, i);
@ -1673,7 +1741,7 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) {
}
_EXIT:
streamStateCloseTaskBackend(pTaskBackend);
taskBackendDestroy(pTaskBackend);
return code;
}

View File

@ -224,11 +224,12 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) {
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
// add ref later
taskBackendAddRef(*ppBackend);
taosThreadMutexUnlock(&pMeta->backendMutex);
return *ppBackend;
}
void* pBackend = streamStateOpenTaskBackend(pMeta->path, key);
void* pBackend = taskBackendOpen(pMeta->path, key);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexLock(&pMeta->backendMutex);
return pBackend;

View File

@ -106,20 +106,12 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
// char statePath[1024];
// if (!specPath) {
// sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
// } else {
// memset(statePath, 0, 1024);
// tstrncpy(statePath, path, 1024);
// }
pState->taskId = pStreamTask->id.taskId;
pState->streamId = pStreamTask->id.streamId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
#ifdef USE_ROCKSDB
// SStreamMeta* pMeta = pStreamTask->pMeta;
SStreamMeta* pMeta = pStreamTask->pMeta;
// pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
// taosThreadMutexLock(&pMeta->backendMutex);

View File

@ -388,7 +388,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
//pTask->pBackend = streamStateOpenTaskBackend(pMeta->path, (char*)pTask->id.idStr);
// pTask->pBackend = taskBackendOpen(pMeta->path, (char*)pTask->id.idStr);
return TSDB_CODE_SUCCESS;
}