diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b40ab39deb..93b4d3f650 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -545,7 +545,9 @@ void streamBackendCleanup(void* arg) { if (pHandle->db) { char* err = NULL; rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flush(pHandle->db, flushOpt, &err); + if (err != NULL) { qError("failed to flush db before streamBackend clean up, reason:%s", err); taosMemoryFree(err); @@ -568,7 +570,6 @@ void streamBackendCleanup(void* arg) { taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->cfMutex); - qDebug("destroy stream backend backend:%p", pHandle); taosMemoryFree(pHandle); return; @@ -586,8 +587,7 @@ void streamBackendHandleCleanup(void* arg) { char* err = NULL; if (remove) { for (int i = 0; i < cfLen; i++) { - if (wrapper->pHandle[i] != NULL) - rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err); + if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err); if (err != NULL) { // qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -595,6 +595,8 @@ void streamBackendHandleCleanup(void* arg) { } } else { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); + for (int i = 0; i < cfLen; i++) { if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err); if (err != NULL) { @@ -767,12 +769,38 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { taosMemoryFree(chkpPath); return 0; } + +int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray* refs) { + void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + while (pIter) { + int64_t id = *(int64_t*)pIter; + + SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); + if (wrapper == NULL) continue; + + taosThreadRwlockRdlock(&wrapper->rwLock); + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (wrapper->pHandle[i]) { + rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; + taosArrayPush(pHandle, &p); + } + } + taosThreadRwlockUnlock(&wrapper->rwLock); + + taosArrayPush(refs, &id); + pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + } + return 0; +} int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SStreamMeta* pMeta = arg; int64_t backendRid = pMeta->streamBackendRid; int64_t st = taosGetTimestampMs(); int32_t code = -1; + SArray* refs = taosArrayInit(16, sizeof(int64_t)); + SArray* pCf = taosArrayInit(16, POINTER_BYTES); + char path[256] = {0}; sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); code = taosMulModeMkDir(path, 0755); @@ -782,20 +810,29 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } char checkpointDir[256] = {0}; - snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint%" PRId64, path, checkpointId); + snprintf(checkpointDir, tListLen(checkpointDir), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, checkpointId); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL) { return -1; } - qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, checkpointDir); + streamBackendGetAllCfHandle(pMeta, pCf, refs); + int32_t nCf = taosArrayGetSize(pCf); + rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + for (int i = 0; i < nCf; i++) { + ppCf[i] = taosArrayGetP(pCf, i); + } + + qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf); if (pHandle->db != NULL) { char* err = NULL; rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - rocksdb_flush(pHandle->db, flushOpt, &err); + rocksdb_flushoptions_set_wait(flushOpt, 1); + + rocksdb_flush_cfs(pHandle->db, flushOpt, ppCf, nCf, &err); if (err != NULL) { qError("failed to flush db before streamBackend clean up, reason:%s", err); taosMemoryFree(err); @@ -821,14 +858,24 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } rocksdb_checkpoint_object_destroy(cp); } + + for (int i = 0; i < taosArrayGetSize(refs); i++) { + int64_t id = *(int64_t*)taosArrayGet(refs, i); + taosReleaseRef(streamBackendCfWrapperId, id); + } taosWLockLatch(&pMeta->chkpDirLock); taosArrayPush(pMeta->chkpSaved, &checkpointId); taosWUnLockLatch(&pMeta->chkpDirLock); + taosArrayDestroy(refs); + taosMemoryFree(ppCf); + delObsoleteCheckpoint(arg, path); _ERROR: taosReleaseRef(streamBackendId, backendRid); + taosArrayDestroy(refs); + taosMemoryFree(ppCf); return code; } @@ -1371,7 +1418,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1492,7 +1539,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - // rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1396b627c2..205141efd6 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -139,7 +139,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz pState->pTdbState->backendCfWrapperId = id; pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); // already exist stream task for - qInfo("already exist stream state for %s", pState->pTdbState->idstr); + qInfo("already exist stream-state for %s", pState->pTdbState->idstr); // taosAcquireRef(streamBackendId, pState->streamBackendRid); } taosThreadMutexUnlock(&pMeta->backendMutex); @@ -431,9 +431,9 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo // code = streamStatePutBatch_rocksdb(pState, batch); // streamStateDestroyBatch(batch); code = streamDefaultPut_rocksdb(pState, pKey, pVal, vLen); - char* Val = NULL; - int32_t len = 0; - code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len); + // char* Val = NULL; + // int32_t len = 0; + // code = streamDefaultGet_rocksdb(pState, pKey, (void**)&Val, &len); return code; #else return 0;