Merge pull request #25841 from taosdata/feat/ly_opt_checkpointYh
Feat/ly_opt_checkpointYh
This commit is contained in:
commit
9527b66db1
|
@ -247,6 +247,7 @@ int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
|
||||||
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
|
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
|
||||||
|
|
||||||
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
|
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
|
||||||
|
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
|
||||||
|
|
||||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
|
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,9 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter,
|
||||||
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
||||||
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
|
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
|
||||||
|
|
||||||
|
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
|
||||||
|
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
|
||||||
|
|
||||||
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
|
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
|
||||||
int32_t copyFiles(const char* src, const char* dst);
|
int32_t copyFiles(const char* src, const char* dst);
|
||||||
uint32_t nextPow2(uint32_t x);
|
uint32_t nextPow2(uint32_t x);
|
||||||
|
@ -1010,6 +1013,50 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int chkpIdComp(const void* a, const void* b) {
|
||||||
|
int64_t x = *(int64_t*)a;
|
||||||
|
int64_t y = *(int64_t*)b;
|
||||||
|
if (x == y) return 0;
|
||||||
|
|
||||||
|
return x < y ? -1 : 1;
|
||||||
|
}
|
||||||
|
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char* pChkpDir = taosMemoryCalloc(1, 256);
|
||||||
|
|
||||||
|
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||||
|
if (!taosIsDir(pChkpDir)) {
|
||||||
|
taosMemoryFree(pChkpDir);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
TdDirPtr pDir = taosOpenDir(pChkpDir);
|
||||||
|
if (pDir == NULL) {
|
||||||
|
taosMemoryFree(pChkpDir);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
TdDirEntryPtr de = NULL;
|
||||||
|
while ((de = taosReadDir(pDir)) != NULL) {
|
||||||
|
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
|
||||||
|
|
||||||
|
if (taosDirEntryIsDir(de)) {
|
||||||
|
char checkpointPrefix[32] = {0};
|
||||||
|
int64_t checkpointId = 0;
|
||||||
|
|
||||||
|
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
|
||||||
|
if (ret == 1) {
|
||||||
|
taosArrayPush(pBackend->chkpSaved, &checkpointId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArraySort(pBackend->chkpSaved, chkpIdComp);
|
||||||
|
|
||||||
|
taosMemoryFree(pChkpDir);
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
|
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
|
||||||
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
|
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
|
||||||
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
||||||
|
@ -1043,8 +1090,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
|
||||||
rocksdb_checkpoint_create(cp, path, 64 << 20, &err);
|
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("failed to do checkpoint at:%s, reason:%s", path, err);
|
stError("failed to do checkpoint at:%s, reason:%s", path, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
@ -1056,7 +1102,6 @@ _ERROR:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
|
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
|
||||||
if (nCf == 0) return 0;
|
|
||||||
int code = 0;
|
int code = 0;
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
|
||||||
|
@ -1109,9 +1154,12 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
|
||||||
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
|
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
|
||||||
taskDbAddRef(pTaskDb);
|
taskDbAddRef(pTaskDb);
|
||||||
|
|
||||||
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId);
|
int64_t chkpId = pTaskDb->chkpId;
|
||||||
|
code = taskDbDoCheckpoint(pTaskDb, chkpId);
|
||||||
taskDbRemoveRef(pTaskDb);
|
taskDbRemoveRef(pTaskDb);
|
||||||
|
|
||||||
|
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
|
||||||
|
|
||||||
SStreamTask* pTask = pTaskDb->pTask;
|
SStreamTask* pTask = pTaskDb->pTask;
|
||||||
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
|
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
|
||||||
.taskId = pTask->id.taskId,
|
.taskId = pTask->id.taskId,
|
||||||
|
@ -1124,6 +1172,28 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
|
||||||
|
if (pSnapInfo == NULL) return 0;
|
||||||
|
SStreamMeta* pMeta = arg;
|
||||||
|
int32_t code = 0;
|
||||||
|
taosThreadMutexLock(&pMeta->backendMutex);
|
||||||
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
|
||||||
|
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
|
||||||
|
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
|
||||||
|
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
|
||||||
|
if (pTaskDb == NULL) {
|
||||||
|
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
memset(buf, 0, sizeof(buf));
|
||||||
|
|
||||||
|
taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
|
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
|
||||||
// if (arg == NULL) return 0;
|
// if (arg == NULL) return 0;
|
||||||
|
@ -1192,20 +1262,23 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
|
||||||
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
||||||
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
||||||
|
|
||||||
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
|
int64_t written = atomic_load_64(&pTaskDb->dataWritten);
|
||||||
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
|
||||||
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
if (written > 0) {
|
||||||
} else {
|
stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
|
||||||
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
|
||||||
taosGetTimestampMs() - st);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
|
stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
|
||||||
|
}
|
||||||
|
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
||||||
|
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
||||||
|
} else {
|
||||||
|
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
||||||
|
taosGetTimestampMs() - st);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
|
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
|
||||||
pTaskDb->dataWritten = 0;
|
atomic_store_64(&pTaskDb->dataWritten, 0);
|
||||||
|
|
||||||
pTaskDb->chkpId = chkpId;
|
pTaskDb->chkpId = chkpId;
|
||||||
|
|
||||||
_EXIT:
|
_EXIT:
|
||||||
|
@ -1846,7 +1919,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
|
||||||
rocksdb_options_set_create_if_missing(opts, 1);
|
rocksdb_options_set_create_if_missing(opts, 1);
|
||||||
rocksdb_options_set_create_missing_column_families(opts, 1);
|
rocksdb_options_set_create_missing_column_families(opts, 1);
|
||||||
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
|
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
|
||||||
rocksdb_options_set_recycle_log_file_num(opts, 6);
|
// rocksdb_options_set_ecycle_log_file_num(opts, 6);
|
||||||
rocksdb_options_set_max_write_buffer_number(opts, 3);
|
rocksdb_options_set_max_write_buffer_number(opts, 3);
|
||||||
rocksdb_options_set_info_log_level(opts, 1);
|
rocksdb_options_set_info_log_level(opts, 1);
|
||||||
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
|
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
|
||||||
|
@ -1900,11 +1973,32 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
|
||||||
pTaskDb->chkpId = -1;
|
pTaskDb->chkpId = -1;
|
||||||
pTaskDb->chkpCap = 4;
|
pTaskDb->chkpCap = 4;
|
||||||
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
taskDbLoadChkpInfo(pTaskDb);
|
||||||
|
|
||||||
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
|
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
|
||||||
|
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
|
||||||
|
taosArrayPush(pTaskDb->chkpInUse, &chkp);
|
||||||
|
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
|
||||||
|
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
|
||||||
|
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
|
||||||
|
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
|
||||||
|
if (*p == chkp) {
|
||||||
|
taosArrayRemove(pTaskDb->chkpInUse, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
|
||||||
|
}
|
||||||
|
|
||||||
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
|
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
|
||||||
taosArrayDestroy(pTaskDb->chkpSaved);
|
taosArrayDestroy(pTaskDb->chkpSaved);
|
||||||
taosArrayDestroy(pTaskDb->chkpInUse);
|
taosArrayDestroy(pTaskDb->chkpInUse);
|
||||||
|
@ -2131,15 +2225,18 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
||||||
|
int32_t code = -1;
|
||||||
STaskDbWrapper* pDb = arg;
|
STaskDbWrapper* pDb = arg;
|
||||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||||
|
|
||||||
|
taskDbRefChkp(pDb, chkpId);
|
||||||
if (utype == DATA_UPLOAD_RSYNC) {
|
if (utype == DATA_UPLOAD_RSYNC) {
|
||||||
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||||
} else if (utype == DATA_UPLOAD_S3) {
|
} else if (utype == DATA_UPLOAD_S3) {
|
||||||
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
||||||
}
|
}
|
||||||
return -1;
|
taskDbUnRefChkp(pDb, chkpId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
|
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
|
||||||
|
@ -2563,7 +2660,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
|
@ -2640,7 +2737,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
|
@ -2681,7 +2778,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
stDebug("streamStateClear_rocksdb");
|
stDebug("streamStateClear_rocksdb");
|
||||||
|
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
char sKeyStr[128] = {0};
|
char sKeyStr[128] = {0};
|
||||||
char eKeyStr[128] = {0};
|
char eKeyStr[128] = {0};
|
||||||
|
@ -3707,7 +3804,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
|
||||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
void* val, int32_t vlen, int64_t ttl) {
|
void* val, int32_t vlen, int64_t ttl) {
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
int i = streamStateGetCfIdx(pState, cfKeyName);
|
int i = streamStateGetCfIdx(pState, cfKeyName);
|
||||||
if (i < 0) {
|
if (i < 0) {
|
||||||
|
@ -3741,7 +3838,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
||||||
|
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
|
||||||
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
||||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||||
|
@ -3760,7 +3858,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("streamState failed to write batch, err:%s", err);
|
stError("streamState failed to write batch, err:%s", err);
|
||||||
|
|
|
@ -74,7 +74,9 @@ struct SStreamSnapHandle {
|
||||||
int32_t currFileIdx;
|
int32_t currFileIdx;
|
||||||
char* metaPath;
|
char* metaPath;
|
||||||
|
|
||||||
|
void* pMeta;
|
||||||
SArray* pDbSnapSet;
|
SArray* pDbSnapSet;
|
||||||
|
SArray* pSnapInfoSet;
|
||||||
int32_t currIdx;
|
int32_t currIdx;
|
||||||
int8_t delFlag; // 0 : not del, 1: del
|
int8_t delFlag; // 0 : not del, 1: del
|
||||||
};
|
};
|
||||||
|
@ -140,7 +142,9 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
|
||||||
return taosOpenFile(fullname, opt);
|
return taosOpenFile(fullname, opt);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
|
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
|
||||||
|
|
||||||
|
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
|
||||||
|
|
||||||
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
if (qDebugFlag & DEBUG_DEBUG) {
|
||||||
|
@ -291,30 +295,26 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
|
||||||
}
|
}
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
|
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
|
||||||
// impl later
|
// impl later
|
||||||
SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
||||||
int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet);
|
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
|
||||||
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
|
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
|
||||||
|
|
||||||
SBackendSnapFile2 snapFile = {0};
|
SBackendSnapFile2 snapFile = {0};
|
||||||
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
|
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
taosArrayPush(pDbSnapSet, &snapFile);
|
taosArrayPush(pDbSnapSet, &snapFile);
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
|
|
||||||
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
|
|
||||||
taosMemoryFree(pSnap->dbPrefixPath);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pSnapSet);
|
|
||||||
|
|
||||||
pHandle->pDbSnapSet = pDbSnapSet;
|
pHandle->pDbSnapSet = pDbSnapSet;
|
||||||
|
pHandle->pSnapInfoSet = pSnapInfoSet;
|
||||||
pHandle->currIdx = 0;
|
pHandle->currIdx = 0;
|
||||||
|
pHandle->pMeta = pMeta;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -333,6 +333,14 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(handle->pDbSnapSet);
|
taosArrayDestroy(handle->pDbSnapSet);
|
||||||
}
|
}
|
||||||
|
streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
|
||||||
|
if (handle->pSnapInfoSet) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
|
||||||
|
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);
|
||||||
|
taosMemoryFree(pSnap->dbPrefixPath);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(handle->pSnapInfoSet);
|
||||||
|
}
|
||||||
taosMemoryFree(handle->metaPath);
|
taosMemoryFree(handle->metaPath);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue