Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-24 16:29:09 +08:00
parent 23ae62d268
commit 6416a6153a
4 changed files with 42 additions and 30 deletions

View File

@ -942,11 +942,11 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) {
if (pIter == NULL) break; if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId); maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
mError("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId); mDebug("stream %p checkpoint %" PRId64 "", pStream, pStream->checkpointId);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
mError("generated checkpoint %" PRId64 "", maxChkpId + 1); mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1);
return maxChkpId + 1; return maxChkpId + 1;
} }

View File

@ -139,7 +139,8 @@ void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
void taskDbDestroy(void* pBackend); void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
@ -217,7 +218,7 @@ int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** p
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
void* streamDefaultIterCreate_rocksdb(SStreamState* pState); void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
bool streamDefaultIterValid_rocksdb(void* iter); bool streamDefaultIterValid_rocksdb(void* iter);
void streamDefaultIterSeek_rocksdb(void* iter, const char* key); void streamDefaultIterSeek_rocksdb(void* iter, const char* key);
void streamDefaultIterNext_rocksdb(void* iter); void streamDefaultIterNext_rocksdb(void* iter);
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
@ -245,8 +246,8 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); // STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
void taskDbDestroy(void* pDb); // void taskDbDestroy(void* pDb, bool flush);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);

View File

@ -190,12 +190,12 @@ int32_t getCfIdx(const char* cfName) {
} }
bool isValidCheckpoint(const char* dir) { bool isValidCheckpoint(const char* dir) {
// return true; return true;
STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir); STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir);
if (pDb == NULL) { if (pDb == NULL) {
return true; return false;
} }
taskDbDestroy(pDb); taskDbDestroy(pDb, false);
return true; return true;
} }
@ -1788,7 +1788,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
return pTaskDb; return pTaskDb;
_EXIT: _EXIT:
taskDbDestroy(pTaskDb); taskDbDestroy(pTaskDb, false);
if (err) taosMemoryFree(err); if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
return NULL; return NULL;
@ -1807,7 +1807,7 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
return pTaskDb; return pTaskDb;
} }
void taskDbDestroy(void* pDb) { void taskDbDestroy(void* pDb, bool flush) {
STaskDbWrapper* wrapper = pDb; STaskDbWrapper* wrapper = pDb;
qDebug("succ to destroy stream backend:%p", wrapper); qDebug("succ to destroy stream backend:%p", wrapper);
@ -1815,24 +1815,33 @@ void taskDbDestroy(void* pDb) {
if (wrapper == NULL) return; if (wrapper == NULL) return;
if (wrapper->db && wrapper->pCf) { if (flush) {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); if (wrapper->db && wrapper->pCf) {
rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
char* err = NULL; char* err = NULL;
for (int i = 0; i < nCf; i++) { rocksdb_column_family_handle_t** cfs = taosMemoryCalloc(1, sizeof(rocksdb_column_family_handle_t*) * nCf);
if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); int numOfFlushCf = 0;
if (err != NULL) { for (int i = 0; i < nCf; i++) {
stError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err); if (wrapper->pCf[i] != NULL) {
taosMemoryFreeClear(err); cfs[numOfFlushCf++] = wrapper->pCf[i];
}
} }
if (numOfFlushCf != 0) {
rocksdb_flush_cfs(wrapper->db, flushOpt, cfs, numOfFlushCf, &err);
if (err != NULL) {
stError("failed to flush all cfs, reason:%s", err);
taosMemoryFreeClear(err);
}
}
taosMemoryFree(cfs);
rocksdb_flushoptions_destroy(flushOpt);
} }
rocksdb_flushoptions_destroy(flushOpt); }
for (int i = 0; i < nCf; i++) {
for (int i = 0; i < nCf; i++) { if (wrapper->pCf[i] != NULL) {
if (wrapper->pCf[i] != NULL) { rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
rocksdb_column_family_handle_destroy(wrapper->pCf[i]);
}
} }
} }
rocksdb_options_destroy(wrapper->dbOpt); rocksdb_options_destroy(wrapper->dbOpt);
@ -1869,6 +1878,8 @@ void taskDbDestroy(void* pDb) {
return; return;
} }
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t code = -1; int32_t code = -1;
@ -2007,7 +2018,7 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
} }
_EXIT: _EXIT:
taskDbDestroy(pTaskDb); taskDbDestroy(pTaskDb, true);
return code; return code;
} }

View File

@ -57,7 +57,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskDbWrapperId = taosOpenRef(64, taskDbDestroy); taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl); streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
@ -1239,8 +1239,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
streamMetaWLock(pMeta); streamMetaWLock(pMeta);