fix checkpoint error

This commit is contained in:
yihaoDeng 2023-08-19 11:58:12 +08:00
parent 8e4411c144
commit fbdea723b0
2 changed files with 59 additions and 12 deletions

View File

@ -545,7 +545,9 @@ void streamBackendCleanup(void* arg) {
if (pHandle->db) { if (pHandle->db) {
char* err = NULL; char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush(pHandle->db, flushOpt, &err); rocksdb_flush(pHandle->db, flushOpt, &err);
if (err != NULL) { if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err); qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err); taosMemoryFree(err);
@ -568,7 +570,6 @@ void streamBackendCleanup(void* arg) {
taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex); taosThreadMutexDestroy(&pHandle->cfMutex);
qDebug("destroy stream backend backend:%p", pHandle); qDebug("destroy stream backend backend:%p", pHandle);
taosMemoryFree(pHandle); taosMemoryFree(pHandle);
return; return;
@ -586,8 +587,7 @@ void streamBackendHandleCleanup(void* arg) {
char* err = NULL; char* err = NULL;
if (remove) { if (remove) {
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL) if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err);
rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err);
if (err != NULL) { if (err != NULL) {
// qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); // qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
@ -595,6 +595,8 @@ void streamBackendHandleCleanup(void* arg) {
} }
} else { } else {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err); if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
if (err != NULL) { if (err != NULL) {
@ -767,12 +769,38 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
taosMemoryFree(chkpPath); taosMemoryFree(chkpPath);
return 0; return 0;
} }
int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray* refs) {
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
while (pIter) {
int64_t id = *(int64_t*)pIter;
SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
if (wrapper == NULL) continue;
taosThreadRwlockRdlock(&wrapper->rwLock);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (wrapper->pHandle[i]) {
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
taosArrayPush(pHandle, &p);
}
}
taosThreadRwlockUnlock(&wrapper->rwLock);
taosArrayPush(refs, &id);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
return 0;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid; int64_t backendRid = pMeta->streamBackendRid;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t code = -1; int32_t code = -1;
SArray* refs = taosArrayInit(16, sizeof(int64_t));
SArray* pCf = taosArrayInit(16, POINTER_BYTES);
char path[256] = {0}; char path[256] = {0};
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(path, 0755); code = taosMulModeMkDir(path, 0755);
@ -782,20 +810,29 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} }
char checkpointDir[256] = {0}; char checkpointDir[256] = {0};
snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint%" PRId64, path, checkpointId); snprintf(checkpointDir, tListLen(checkpointDir), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, checkpointId);
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL) { if (pHandle == NULL) {
return -1; return -1;
} }
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, checkpointDir); streamBackendGetAllCfHandle(pMeta, pCf, refs);
int32_t nCf = taosArrayGetSize(pCf);
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
ppCf[i] = taosArrayGetP(pCf, i);
}
qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf);
if (pHandle->db != NULL) { if (pHandle->db != NULL) {
char* err = NULL; char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flush(pHandle->db, flushOpt, &err); rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush_cfs(pHandle->db, flushOpt, ppCf, nCf, &err);
if (err != NULL) { if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err); qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err); taosMemoryFree(err);
@ -821,14 +858,24 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} }
rocksdb_checkpoint_object_destroy(cp); rocksdb_checkpoint_object_destroy(cp);
} }
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);
}
taosWLockLatch(&pMeta->chkpDirLock); taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId); taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock); taosWUnLockLatch(&pMeta->chkpDirLock);
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
delObsoleteCheckpoint(arg, path); delObsoleteCheckpoint(arg, path);
_ERROR: _ERROR:
taosReleaseRef(streamBackendId, backendRid); taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
return code; return code;
} }
@ -1371,7 +1418,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt; inst->dbOpt = handle->dbOpt;
// rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else { } else {
inst = *pInst; inst = *pInst;
@ -1492,7 +1539,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);

View File

@ -139,7 +139,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
pState->pTdbState->backendCfWrapperId = id; pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// already exist stream task for // already exist stream task for
qInfo("already exist stream state for %s", pState->pTdbState->idstr); qInfo("already exist stream-state for %s", pState->pTdbState->idstr);
// taosAcquireRef(streamBackendId, pState->streamBackendRid); // taosAcquireRef(streamBackendId, pState->streamBackendRid);
} }
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
@ -431,9 +431,9 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
// code = streamStatePutBatch_rocksdb(pState, batch); // code = streamStatePutBatch_rocksdb(pState, batch);
// streamStateDestroyBatch(batch); // streamStateDestroyBatch(batch);
code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen); code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen);
char* Val = NULL; // char* Val = NULL;
int32_t len = 0; // int32_t len = 0;
code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len); // code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len);
return code; return code;
#else #else
return 0; return 0;