Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
e6b49b45d9
|
@ -1446,8 +1446,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
}
|
}
|
||||||
// close default cf
|
// close default cf
|
||||||
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
|
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
|
||||||
// rocksdb_column_family_handle_destroy(cfHandle[0]);
|
rocksdb_column_family_handle_destroy(cfHandle[0]);
|
||||||
// cfHandle[0] = NULL;
|
cfHandle[0] = NULL;
|
||||||
}
|
}
|
||||||
rocksdb_options_destroy(cfOpts[0]);
|
rocksdb_options_destroy(cfOpts[0]);
|
||||||
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
||||||
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
||||||
pMeta->chkpCap = 8;
|
pMeta->chkpCap = 2;
|
||||||
taosInitRWLatch(&pMeta->chkpDirLock);
|
taosInitRWLatch(&pMeta->chkpDirLock);
|
||||||
|
|
||||||
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
||||||
|
@ -143,11 +143,11 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
|
||||||
pMeta->streamBackendRid = -1;
|
pMeta->streamBackendRid = -1;
|
||||||
pMeta->streamBackend = NULL;
|
pMeta->streamBackend = NULL;
|
||||||
|
|
||||||
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
|
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
||||||
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
|
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
|
||||||
taosRemoveDir(defaultPath);
|
taosRemoveDir(defaultPath);
|
||||||
|
|
||||||
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
|
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
||||||
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
|
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
|
||||||
|
|
||||||
int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
|
int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
|
||||||
|
@ -156,7 +156,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
|
||||||
code = taosRenameFile(newPath, defaultPath);
|
code = taosRenameFile(newPath, defaultPath);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
qError("vgId:%d failed to rename file, from %s to %s, code:%s", newPath, defaultPath, pMeta->vgId,
|
qError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
|
|
||||||
taosMemoryFree(defaultPath);
|
taosMemoryFree(defaultPath);
|
||||||
|
|
Loading…
Reference in New Issue