From 4ca39d0f3c4c2224a8b371381f1177d9af89dcd2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 01:45:16 +0000 Subject: [PATCH 01/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2e9032a47e..f24186c673 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -299,11 +299,12 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; - RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); + + void* pIter = taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; destroyRocksdbCfInst(inst); - taosHashIterate(pHandle->cfInst, pIter); + pIter = taosHashIterate(pHandle->cfInst, pIter); } taosHashCleanup(pHandle->cfInst); @@ -1103,7 +1104,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < nCf; i++) { char* cf = cfs[i]; - if (i == 0) continue; + if (i == 0) continue; // skip default column family, not set opt + char funcname[64] = {0}; if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { char idstr[128] = {0}; @@ -1125,7 +1127,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1136,9 +1137,9 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pHandle[idx] = cfHandle[i]; } } - void** pIter = taosHashIterate(handle->cfInst, NULL); + void* pIter = taosHashIterate(handle->cfInst, NULL); while (pIter) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; for (int i = 0; i < cfLen; i++) { if (inst->cfOpt[i] == NULL) { @@ -1179,8 +1180,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); - taosThreadMutexLock(&handle->cfMutex); + taosThreadMutexLock(&handle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; From 7ef4df87526517993a83c9995a309228a08a8083 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 07:20:05 +0000 Subject: [PATCH 02/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 54 +++++++++---------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f24186c673..ba12c47bc0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -408,6 +408,7 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { int64_t tc = 0; int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); if (sz <= 0) { + taosWUnLockLatch(&pMeta->checkpointDirLock); return -1; } else { tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); @@ -623,9 +624,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, return ret; } } -int streamStateValueIsStale(char* vv) { +int streamStateValueIsStale(char* v) { int64_t ts = 0; - taosDecodeFixedI64(vv, &ts); + taosDecodeFixedI64(v, &ts); return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; } int iterValueIsStale(rocksdb_iterator_t* iter) { @@ -956,33 +957,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { - if (dest != NULL) *dest = NULL; - return -1; + goto _EXCEPT; } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { - if (dest != NULL) *dest = NULL; qError("vlen: %d, read len: %d", vlen, key.len); - return -1; + goto _EXCEPT; } + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); - if (key.len == 0) { - key.data = NULL; - } else { - p = taosDecodeBinary(p, (void**)&(key.data), key.len); - } - - if (ttl != NULL) { - int64_t now = taosGetTimestampMs(); - *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; - } - if (dest != NULL) { - *dest = key.data; - } else { - taosMemoryFree(key.data); - } + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; + +_EXCEPT: + if (dest != NULL) *dest = NULL; + if (ttl != NULL) *ttl = 0; + return -1; } const char* compareDefaultName(void* arg) { @@ -1096,9 +1087,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } else { qDebug("succ to open rocksdb cf"); } - // close default cf + // close default cf and destroy default cfOpts if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_options_destroy(cfOpts[0]); + handle->db = db; static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -2354,9 +2346,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { } int32_t streamDefaultIterValid_rocksdb(void* iter) { SStreamStateCur* pCur = iter; - bool val = rocksdb_iter_valid(pCur->iter); - - return val ? 1 : 0; + return rocksdb_iter_valid(pCur->iter) ? 1 : 0; } void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { SStreamStateCur* pCur = iter; @@ -2372,13 +2362,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { } char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { SStreamStateCur* pCur = iter; - int32_t vlen = 0; - char* dst = NULL; - const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); - if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { + char* ret = NULL; + + int32_t vlen = 0; + const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); + *len = decodeValueFunc((void*)val, vlen, NULL, &ret); + if (*len < 0) { return NULL; } - return dst; + + return ret; } // batch func void* streamStateCreateBatch() { @@ -2433,6 +2426,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb if (tmpBuf == NULL) { taosMemoryFree(ttlV); } + { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf); From eeb97351e8712c1156ccf78814f9d36f08940fcc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 08:51:01 +0000 Subject: [PATCH 03/22] support reopen stream state --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 3 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 11 ++- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 6 ++ source/libs/stream/src/streamBackendRocksdb.c | 8 +-- source/libs/stream/src/streamMeta.c | 69 ++++++++++--------- 6 files changed, 60 insertions(+), 38 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 51a31f72ed..9c01b40dce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -612,6 +612,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); +int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cb7af681ee..18f8872e03 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -331,6 +331,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId); +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter); + + // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 3478928c4f..f1bdc0d6de 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -163,7 +163,16 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) return code; } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { - return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId); + return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId); +} + +int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) { + // impl later + return streamLoadTasks(pMeta, ver); +} +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { + SWal* pWal = pWriter->pTq->pVnode->pWal; + return streamStateLoadTasksImpl(pWriter->pTq->pStreamMeta, walGetCommittedVer(pWal)); } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 16bd233807..be5fb4785c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -409,6 +409,12 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (pWriter->pStreamStateWriter) { code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); if (code) goto _exit; + + code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0); + if (code) goto _exit; + + code = streamStateLoadTasks(pWriter->pStreamStateWriter); + if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ba12c47bc0..c63942c8cc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -732,8 +732,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); } -int stateSessionKeyEncode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyEncode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); @@ -741,8 +741,8 @@ int stateSessionKeyEncode(void* ses, char* buf) { len += taosEncodeFixedI64((void**)&buf, sess->opNum); return len; } -int stateSessionKeyDecode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyDecode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; char* p = buf; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3cf967a219..e93499ab89 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -128,48 +128,51 @@ _err: return NULL; } -void streamMetaReopen(SStreamMeta** ppMeta) { - SStreamMeta* pMeta = *ppMeta; +void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { + // stop all running tasking and reopen later + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } - SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); - pNewMeta->path = taosStrdup(pMeta->path); - pNewMeta->vgId = pMeta->vgId; - pNewMeta->walScanCounter = 0; - pNewMeta->ahandle = pMeta->ahandle; - pNewMeta->expandFunc = pMeta->expandFunc; + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->schedTimer) { + taosTmrStop(pTask->schedTimer); + pTask->schedTimer = NULL; + } - *ppMeta = pNewMeta; + if (pTask->launchTaskTimer) { + taosTmrStop(pTask->launchTaskTimer); + pTask->launchTaskTimer = NULL; + } - streamMetaClose(pMeta); + tFreeStreamTask(pTask); + } - // tdbAbort(pMeta->db, pMeta->txn); - // tdbTbClose(pMeta->pTaskDb); - // tdbTbClose(pMeta->pCheckpointDb); - // tdbClose(pMeta->db); + // close stream backend + streamBackendCleanup(pMeta->streamBackend); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + pMeta->streamBackendRid = -1; + pMeta->streamBackend = NULL; - // void* pIter = NULL; - // while (1) { - // pIter = taosHashIterate(pMeta->pTasks, pIter); - // if (pIter == NULL) { - // break; - // } + pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - // SStreamTask* pTask = *(SStreamTask**)pIter; - // if (pTask->schedTimer) { - // taosTmrStop(pTask->schedTimer); - // pTask->schedTimer = NULL; - // } + taosHashClear(pMeta->pTasks); - // if (pTask->launchTaskTimer) { - // taosTmrStop(pTask->launchTaskTimer); - // pTask->launchTaskTimer = NULL; - // } + taosArrayClear(pMeta->pTaskList); - // tFreeStreamTask(pTask); - // } + taosHashClear(pMeta->pTaskBackendUnique); - // taosHashClear(pMeta->pTasks); - // taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + taosArrayClear(pMeta->checkpointSaved); + + taosArrayClear(pMeta->checkpointInUse); + + // if (streamLoadTasks(pMeta,int64_t ver)) + + return; } void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn); From b6c991f896129ff512212f18b5f8c14b6ffd7d41 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 08:59:04 +0000 Subject: [PATCH 04/22] support reopen stream state --- include/libs/stream/tstream.h | 4 ++-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 3 --- source/libs/stream/src/streamMeta.c | 16 ++++++++-------- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9c01b40dce..4c0005ece7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -611,8 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); -int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId); +// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index f1bdc0d6de..87d174715d 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -163,7 +163,11 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) return code; } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { - return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId); + int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); + if (code == 0) { + code = streamStateLoadTasks(pWriter); + } + return code; } int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index be5fb4785c..00f7150c0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -412,9 +412,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0); if (code) goto _exit; - - code = streamStateLoadTasks(pWriter->pStreamStateWriter); - if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e93499ab89..5d0861624e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,14 +36,14 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); } -int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { - int32_t code = 0; +// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { +// int32_t code = 0; - int32_t nTask = taosHashGetSize(pMeta->pTasks); - assert(nTask == 0); +// int32_t nTask = taosHashGetSize(pMeta->pTasks); +// assert(nTask == 0); - return code; -} +// return code; +// } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -128,7 +128,7 @@ _err: return NULL; } -void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // stop all running tasking and reopen later void* pIter = NULL; while (1) { @@ -172,7 +172,7 @@ void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // if (streamLoadTasks(pMeta,int64_t ver)) - return; + return 0; } void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn); From 6c8c57554934f6b870654fecfd43105737f3f70f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 10:48:04 +0000 Subject: [PATCH 05/22] support reopen stream state --- source/libs/stream/src/streamMeta.c | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5d0861624e..7e264128d9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -157,7 +157,20 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; - pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); + char* path1 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(path1, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(path1); + + char* path2 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(path2, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); + + if (taosRenameFile(path2, path1) < 0) { + taosMemoryFree(path1); + taosMemoryFree(path2); + return -1; + } + + pMeta->streamBackend = streamBackendInit(pMeta->path, 0); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosHashClear(pMeta->pTasks); @@ -170,8 +183,6 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { taosArrayClear(pMeta->checkpointInUse); - // if (streamLoadTasks(pMeta,int64_t ver)) - return 0; } void streamMetaClose(SStreamMeta* pMeta) { From 6a69c56b32febe446631d86b57860e2559b83d2e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 10:55:42 +0000 Subject: [PATCH 06/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c63942c8cc..54b0e8498e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -487,15 +487,15 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { int32_t code = 0; int32_t len = strlen(pMeta->path) + 30; - char* checkpointPath = taosMemoryCalloc(1, len); - sprintf(checkpointPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); + char* chkpPath = taosMemoryCalloc(1, len); + sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); - if (!taosDirExist(checkpointPath)) { + if (!taosDirExist(chkpPath)) { // no checkpoint, nothing to load return 0; } - TdDirPtr pDir = taosOpenDir(checkpointPath); + TdDirPtr pDir = taosOpenDir(chkpPath); if (pDir == NULL) return 0; TdDirEntryPtr de = NULL; @@ -525,7 +525,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { taosArrayDestroy(suffix); taosCloseDir(&pDir); - taosMemoryFree(checkpointPath); + taosMemoryFree(chkpPath); return 0; } int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { From 80e78e054d219e09f8fc55ccaa374cba4e99fb5e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 11:39:32 +0000 Subject: [PATCH 07/22] support reopen stream state --- source/libs/stream/src/streamMeta.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7e264128d9..907ff2b48a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -157,20 +157,23 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; - char* path1 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); - sprintf(path1, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); - taosRemoveDir(path1); + char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(defaultPath); - char* path2 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); - sprintf(path2, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); + char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); - if (taosRenameFile(path2, path1) < 0) { - taosMemoryFree(path1); - taosMemoryFree(path2); + if (taosRenameFile(newPath, defaultPath) < 0) { + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); return -1; } pMeta->streamBackend = streamBackendInit(pMeta->path, 0); + if (pMeta->streamBackend == NULL) { + return -1; + } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosHashClear(pMeta->pTasks); From 6b73fc9c06dc74da04d48fd5ddc69894a770cb8c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 13:56:53 +0000 Subject: [PATCH 08/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 54b0e8498e..f926efb94d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,6 +20,17 @@ #include "tcommon.h" #include "tref.h" +typedef struct { + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + char* buf; + int32_t len; +} SBackendManager; + typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -127,6 +138,46 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); +SBackendManager* backendManagerCreate(char* path) { + SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); + p->curChkpId = 0; + p->preCkptId = 0; + p->pSST = taosArrayInit(64, sizeof(void*)); + p->path = taosStrdup(path); + + p->len = strlen(path) + 128; + p->buf = taosMemoryCalloc(1, p->len); + return p; +} + +int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + memset(bm->buf, 0, bm->len); + sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + + TdDirPtr pDir = taosOpenDir(bm->buf); + TdDirEntryPtr de = NULL; + + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + + // sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); + // sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); + // if (!taosDirEntryIsDir(de)) { + // code = taosCopyFile(absSrcPath, absDstPath); + // if (code == -1) { + // goto _err; + // } + // } + + // memset(absSrcPath, 0, sLen + 64); + // memset(absDstPath, 0, dLen + 64); + } + + return 0; +} + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc}, @@ -219,6 +270,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } + void* streamBackendInit(const char* streamPath, int64_t chkpId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); From ac11537fc8a1e36c044b4dea839a28c66cb97547 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Aug 2023 07:21:47 +0000 Subject: [PATCH 09/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 97 +++++++++++++++---- 1 file changed, 76 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f926efb94d..d72901a2a3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -21,14 +21,16 @@ #include "tref.h" typedef struct { - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - char* path; - char* buf; - int32_t len; + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + SHashObj* pSSTable; + char* path; + char* buf; + int32_t len; } SBackendManager; typedef struct SCompactFilteFactory { @@ -144,37 +146,90 @@ SBackendManager* backendManagerCreate(char* path) { p->preCkptId = 0; p->pSST = taosArrayInit(64, sizeof(void*)); p->path = taosStrdup(path); - + p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); return p; } +int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { + int32_t code = 0; + size_t len = 0; + void* pIter = taosHashIterate(p2, NULL); + while (pIter) { + char* name = taosHashGetKey(pIter, &len); + if (!taosHashGet(p1, name, len)) { + char* p = taosStrdup(name); + taosArrayPush(diff, &p); + } + pIter = taosHashIterate(p2, pIter); + } + return code; +} +int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { + int32_t code = 0; + + code = compareHashTableImpl(p1, p2, add); + code = compareHashTableImpl(p2, p1, del); + + return code; +} int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + const char* pCurrent = "CURRENT"; + int32_t currLen = strlen(pCurrent); + + const char* pManifest = "MANIFEST-"; + int32_t maniLen = strlen(pManifest); + + const char* pSST = ".sst"; + int32_t sstLen = strlen(pSST); + memset(bm->buf, 0, bm->len); sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + SHashObj* pTable = bm->init == 0 + ? bm->pSSTable + : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + TdDirPtr pDir = taosOpenDir(bm->buf); TdDirEntryPtr de = NULL; - + int8_t dummy = 0; while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + taosMemoryFreeClear(bm->pCurrent); + bm->pCurrent = taosStrdup(name); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + taosMemoryFreeClear(bm->pManifest); + bm->pManifest = taosStrdup(name); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { + char* p = taosStrdup(name); - // sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); - // sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); - // if (!taosDirEntryIsDir(de)) { - // code = taosCopyFile(absSrcPath, absDstPath); - // if (code == -1) { - // goto _err; - // } - // } - - // memset(absSrcPath, 0, sLen + 64); - // memset(absDstPath, 0, dLen + 64); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } } + if (bm->init == 0) { + bm->preCkptId = chkpId; + bm->curChkpId = chkpId; + bm->init = 1; + } else { + SArray* add = taosArrayInit(64, sizeof(void*)); + SArray* del = taosArrayInit(64, sizeof(void*)); + int32_t code = compareHashTable(bm->pSSTable, pTable, add, del); + + bm->curChkpId = chkpId; + taosHashCleanup(pTable); + } return 0; } From fb7dec00e1ed3039bc9e8a0c764f59c14be0f880 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 01:50:49 +0000 Subject: [PATCH 10/22] support reopen stream state --- include/libs/transport/trpc.h | 2 +- source/client/src/clientEnv.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/transport/inc/transportInt.h | 6 +++--- source/libs/transport/src/trans.c | 2 +- source/libs/transport/src/transCli.c | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 93e4d72ad7..e5955aad54 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -89,7 +89,7 @@ typedef struct SRpcInit { int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval - int64_t retryMaxTimouet; + int64_t retryMaxTimeout; int32_t failFastThreshold; int32_t failFastInterval; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 238b3613f5..40c27bf164 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; - rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5d6d16ccf8..c0194fe4a9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -298,7 +298,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; rpcInit.retryMaxInterval = tsRedirectMaxPeriod; - rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; rpcInit.failFastInterval = 5000; // interval threshold(ms) rpcInit.failFastThreshold = 3; // failed threshold diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index ca48da690b..cc2c0d4e84 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -46,14 +46,14 @@ typedef struct { int8_t connType; char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID - int32_t compatibilityVer; + int32_t compatibilityVer; int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not - + int32_t retryMinInterval; // retry init interval int32_t retryStepFactor; // retry interval factor int32_t retryMaxInterval; // retry max interval - int32_t retryMaxTimouet; + int32_t retryMaxTimeout; int32_t failFastThreshold; int32_t failFastInterval; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 08b0451982..ed94521df0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryStepFactor = pInit->retryStepFactor; pRpc->retryMaxInterval = pInit->retryMaxInterval; - pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + pRpc->retryMaxTimeout = pInit->retryMaxTimeout; pRpc->failFastThreshold = pInit->failFastThreshold; pRpc->failFastInterval = pInit->failFastInterval; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8062a0618b..e7afda59ab 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2256,7 +2256,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->retryMinInterval = pTransInst->retryMinInterval; pCtx->retryMaxInterval = pTransInst->retryMaxInterval; pCtx->retryStepFactor = pTransInst->retryStepFactor; - pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; + pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout; pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryStep = 0; From c43a6b272c59ee3100e6b593f82998988a57a2da Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 02:09:50 +0000 Subject: [PATCH 11/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d72901a2a3..8492410a46 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -221,6 +221,19 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list bm->preCkptId = chkpId; bm->curChkpId = chkpId; bm->init = 1; + + SArray* add = taosArrayInit(64, sizeof(void*)); + + void* pIter = taosHashIterate(pTable, NULL); + while (pIter) { + size_t len; + char* name = taosHashGetKey(pIter, &len); + if (name != NULL && len != 0) { + taosArrayPush(add, &name); + } + pIter = taosHashIterate(pTable, pIter); + } + } else { SArray* add = taosArrayInit(64, sizeof(void*)); SArray* del = taosArrayInit(64, sizeof(void*)); @@ -233,6 +246,20 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list return 0; } +int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { + int32_t code = 0; + char* buf = taosMemoryCalloc(1, strlen(bm->path) + 64); + sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); + + code = taosMkDir(buf); + if (code != 0) { + return code; + } + + + +} + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc}, From bbcfa9ab038f5e78db2b783ab8e729f6839fbe16 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 06:10:40 +0000 Subject: [PATCH 12/22] support reopen stream state --- include/util/tarray.h | 7 +- source/libs/stream/src/streamBackendRocksdb.c | 67 +++++++++++++++++-- source/util/src/tarray.c | 23 ++++++- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/include/util/tarray.h b/include/util/tarray.h index a93c695370..1b6b4587b7 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -200,13 +200,16 @@ void taosArrayClear(SArray* pArray); * @param pArray * @param fp */ + void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); +void taosArrayClearP(SArray* pArray, void (*fp)(void*)); + void* taosArrayDestroy(SArray* pArray); -void taosArrayDestroyP(SArray* pArray, FDelete fp); +void taosArrayDestroyP(SArray* pArray, FDelete fp); -void taosArrayDestroyEx(SArray* pArray, FDelete fp); +void taosArrayDestroyEx(SArray* pArray, FDelete fp); void taosArraySwap(SArray* a, SArray* b); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8492410a46..0804ad4ede 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -31,6 +31,9 @@ typedef struct { char* path; char* buf; int32_t len; + + SArray* pAdd; + SArray* pDel; } SBackendManager; typedef struct SCompactFilteFactory { @@ -149,8 +152,25 @@ SBackendManager* backendManagerCreate(char* path) { p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + + p->pAdd = taosArrayInit(64, sizeof(void*)); + p->pDel = taosArrayInit(64, sizeof(void*)); return p; } +void backendManagerDestroy(SBackendManager* bm) { + if (bm == NULL) return; + + taosMemoryFree(bm->buf); + taosMemoryFree(bm->path); + + taosHashCleanup(bm->pSSTable); + + taosArrayDestroyP(bm->pSST, taosMemoryFree); + taosArrayDestroyP(bm->pAdd, taosMemoryFree); + taosArrayDestroyP(bm->pDel, taosMemoryFree); + + taosMemoryFree(bm); +} int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t code = 0; @@ -191,6 +211,9 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list ? bm->pSSTable : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + TdDirPtr pDir = taosOpenDir(bm->buf); TdDirEntryPtr de = NULL; int8_t dummy = 0; @@ -222,23 +245,23 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list bm->curChkpId = chkpId; bm->init = 1; - SArray* add = taosArrayInit(64, sizeof(void*)); + // SArray* add = taosArrayInit(64, sizeof(void*)); void* pIter = taosHashIterate(pTable, NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); if (name != NULL && len != 0) { - taosArrayPush(add, &name); + taosArrayPush(bm->pAdd, &name); } pIter = taosHashIterate(pTable, pIter); } } else { - SArray* add = taosArrayInit(64, sizeof(void*)); - SArray* del = taosArrayInit(64, sizeof(void*)); + // SArray* add = taosArrayInit(64, sizeof(void*)); + // SArray* del = taosArrayInit(64, sizeof(void*)); - int32_t code = compareHashTable(bm->pSSTable, pTable, add, del); + int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); bm->curChkpId = chkpId; taosHashCleanup(pTable); @@ -248,7 +271,8 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(bm->path) + 64); + int32_t len = bm->len + 64; + char* buf = taosMemoryCalloc(1, len); sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); code = taosMkDir(buf); @@ -256,8 +280,37 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { return code; } - + // clear current file + memset(buf, 0, len); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pCurrent); + taosRemoveFile(buf); + memset(buf, 0, len); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pManifest); + taosRemoveFile(buf); + + for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { + memset(buf, 0, len); + + char* filename = taosArrayGetP(bm->pAdd, i); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); + + char* src = taosMemoryCalloc(1, len); + sprintf(src, "%s%s%s%" PRId64 "%s%s", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId, TD_DIRSEP, filename); + taosCopyFile(src, buf); + } + + for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { + memset(buf, 0, len); + + char* filename = taosArrayGetP(bm->pDel, i); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); + taosRemoveFile(buf); + } + // clear delta data + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + return code; } SCfInit ginitDict[] = { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 8906391a9a..06beba0655 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) { } if (index >= pArray->size) { - uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); + uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity); return NULL; } @@ -319,7 +319,7 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { if (NULL == pSrc) { return NULL; } - + if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } +void taosArrayClearP(SArray* pArray, void (*fp)(void*)) { + // if (pArray == NULL) return; + // if (fp == NULL) { + // pArray->size = 0; + // return; + // } + + // for (int32_t i = 0; i < pArray->size; ++i) { + // fp(TARRAY_GET_ELEM(pArray, i)); + // } + if (pArray) { + for (int32_t i = 0; i < pArray->size; i++) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + } + taosArrayClear(pArray); +} void* taosArrayDestroy(SArray* pArray) { if (pArray) { @@ -492,7 +509,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t // order array void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) { taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn); -// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); + // taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); } void taosArraySwap(SArray* a, SArray* b) { From 92b247aae842d2c92fab6f963a813d16d1a0b1b6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 07:36:00 +0000 Subject: [PATCH 13/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0804ad4ede..024d53adae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -245,8 +245,6 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list bm->curChkpId = chkpId; bm->init = 1; - // SArray* add = taosArrayInit(64, sizeof(void*)); - void* pIter = taosHashIterate(pTable, NULL); while (pIter) { size_t len; @@ -258,9 +256,6 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list } } else { - // SArray* add = taosArrayInit(64, sizeof(void*)); - // SArray* del = taosArrayInit(64, sizeof(void*)); - int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); bm->curChkpId = chkpId; From a619b8f5be035feb0cfb286caa7cff4bff297d58 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 08:58:49 +0000 Subject: [PATCH 14/22] rm duplicate para --- source/libs/stream/src/streamBackendRocksdb.c | 68 +++++++++++++------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 024d53adae..529b29153d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -264,47 +264,73 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list return 0; } -int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { +int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { int32_t code = 0; - int32_t len = bm->len + 64; - char* buf = taosMemoryCalloc(1, len); - sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); + int32_t len = bm->len + 128; - code = taosMkDir(buf); + char* dstBuf = taosMemoryCalloc(1, len); + char* srcBuf = taosMemoryCalloc(1, len); + + char* srcDir = taosMemoryCalloc(1, len); + char* dstDir = taosMemoryCalloc(1, len); + + sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId); + sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname); + + code = taosMkDir(dstDir); if (code != 0) { return code; } // clear current file - memset(buf, 0, len); - sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pCurrent); - taosRemoveFile(buf); + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + taosRemoveFile(dstBuf); - memset(buf, 0, len); - sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pManifest); - taosRemoveFile(buf); + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + taosRemoveFile(dstBuf); + // add file to $name dir for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { - memset(buf, 0, len); + memset(dstBuf, 0, len); char* filename = taosArrayGetP(bm->pAdd, i); - sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); - char* src = taosMemoryCalloc(1, len); - sprintf(src, "%s%s%s%" PRId64 "%s%s", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId, TD_DIRSEP, filename); - taosCopyFile(src, buf); + taosCopyFile(srcBuf, dstBuf); } - + // del file in $name for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { - memset(buf, 0, len); - + memset(dstBuf, 0, len); char* filename = taosArrayGetP(bm->pDel, i); - sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); - taosRemoveFile(buf); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + taosRemoveFile(dstBuf); } + + // copy current file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + taosCopyFile(srcBuf, dstBuf); + + // copy manifest file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + taosCopyFile(srcBuf, dstBuf); + // clear delta data taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree); + + taosMemoryFree(srcBuf); + taosMemoryFree(dstBuf); + taosMemoryFree(srcDir); + taosMemoryFree(dstDir); return code; } From a336a7b1a4d3e9da279c0533f96bbd1532b0a437 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 09:06:22 +0000 Subject: [PATCH 15/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 529b29153d..a724e90866 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -277,6 +277,10 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId); sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname); + if (!taosDirExist(srcDir)) { + return 0; + } + code = taosMkDir(dstDir); if (code != 0) { return code; From f3cf907d5af99ae8ffac5aacc63042c359bb1698 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 09:07:13 +0000 Subject: [PATCH 16/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a724e90866..8f8f268d9a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -308,6 +308,7 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { // del file in $name for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { memset(dstBuf, 0, len); + char* filename = taosArrayGetP(bm->pDel, i); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); taosRemoveFile(dstBuf); From 6fdcd82a330e3c48f51563b0b670eb1f72dced32 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 09:19:03 +0000 Subject: [PATCH 17/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8f8f268d9a..d336c4235a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -34,6 +34,8 @@ typedef struct { SArray* pAdd; SArray* pDel; + + int8_t update; } SBackendManager; typedef struct SCompactFilteFactory { @@ -155,6 +157,7 @@ SBackendManager* backendManagerCreate(char* path) { p->pAdd = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*)); + p->update = 0; return p; } void backendManagerDestroy(SBackendManager* bm) { @@ -254,12 +257,16 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list } pIter = taosHashIterate(pTable, pIter); } + bm->update = 1; } else { int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); bm->curChkpId = chkpId; taosHashCleanup(pTable); + if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { + bm->update = 0; + } } return 0; } From 1db15da4e18d10c8390b8a81239eef0e61f6c86a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 09:50:49 +0000 Subject: [PATCH 18/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d336c4235a..f6439d7499 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -32,10 +32,11 @@ typedef struct { char* buf; int32_t len; - SArray* pAdd; - SArray* pDel; - - int8_t update; + SHashObj* pSstTbl[2]; + SArray* pAdd; + SArray* pDel; + int8_t idx; + int8_t update; } SBackendManager; typedef struct SCompactFilteFactory { @@ -155,6 +156,10 @@ SBackendManager* backendManagerCreate(char* path) { p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + p->idx = 0; + p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pAdd = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*)); p->update = 0; @@ -172,6 +177,8 @@ void backendManagerDestroy(SBackendManager* bm) { taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree); + taosHashCleanup(bm->pSstTbl[0]); + taosHashCleanup(bm->pSstTbl[1]); taosMemoryFree(bm); } @@ -210,10 +217,6 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list memset(bm->buf, 0, bm->len); sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); - SHashObj* pTable = bm->init == 0 - ? bm->pSSTable - : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree); @@ -226,48 +229,50 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { taosMemoryFreeClear(bm->pCurrent); bm->pCurrent = taosStrdup(name); - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { taosMemoryFreeClear(bm->pManifest); bm->pManifest = taosStrdup(name); - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { char* p = taosStrdup(name); - - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } } if (bm->init == 0) { - bm->preCkptId = chkpId; + bm->preCkptId = -1; bm->curChkpId = chkpId; bm->init = 1; - void* pIter = taosHashIterate(pTable, NULL); + void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); if (name != NULL && len != 0) { taosArrayPush(bm->pAdd, &name); } - pIter = taosHashIterate(pTable, pIter); + pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); } bm->update = 1; } else { - int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); + int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + bm->preCkptId = bm->curChkpId; bm->curChkpId = chkpId; - taosHashCleanup(pTable); if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { bm->update = 0; } } + taosHashClear(bm->pSstTbl[bm->idx]); + bm->idx = 1 - bm->idx; + return 0; } From 19ac9054d060e11cb210feb5f67c78ebfb1a9884 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 12:47:49 +0000 Subject: [PATCH 19/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f6439d7499..f34b7dc3cb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -21,16 +21,15 @@ #include "tref.h" typedef struct { - int8_t init; - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - SHashObj* pSSTable; - char* path; - char* buf; - int32_t len; + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + char* buf; + int32_t len; SHashObj* pSstTbl[2]; SArray* pAdd; @@ -146,13 +145,12 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); -SBackendManager* backendManagerCreate(char* path) { +SBackendManager* bkdMgtCreate(char* path) { SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); p->curChkpId = 0; p->preCkptId = 0; p->pSST = taosArrayInit(64, sizeof(void*)); p->path = taosStrdup(path); - p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); @@ -165,14 +163,12 @@ SBackendManager* backendManagerCreate(char* path) { p->update = 0; return p; } -void backendManagerDestroy(SBackendManager* bm) { +void bkdMgtDestroy(SBackendManager* bm) { if (bm == NULL) return; taosMemoryFree(bm->buf); taosMemoryFree(bm->path); - taosHashCleanup(bm->pSSTable); - taosArrayDestroyP(bm->pSST, taosMemoryFree); taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree); @@ -204,7 +200,7 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { return code; } -int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { +int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { const char* pCurrent = "CURRENT"; int32_t currLen = strlen(pCurrent); @@ -259,8 +255,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list } pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); } - bm->update = 1; - + if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; } else { int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); @@ -276,7 +271,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list return 0; } -int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { +int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { int32_t code = 0; int32_t len = bm->len + 128; @@ -340,7 +335,7 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); taosCopyFile(srcBuf, dstBuf); - // clear delta data + // clear delta data buf taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree); From a20b299f9b2ca13d4096145adfb3653c8fd45b18 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 13:06:10 +0000 Subject: [PATCH 20/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f34b7dc3cb..85abe203d4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -305,6 +305,7 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { // add file to $name dir for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { memset(dstBuf, 0, len); + memset(srcBuf, 0, len); char* filename = taosArrayGetP(bm->pAdd, i); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); @@ -315,6 +316,7 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { // del file in $name for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { memset(dstBuf, 0, len); + memset(srcBuf, 0, len); char* filename = taosArrayGetP(bm->pDel, i); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); From ef247cdb1d1d0559b6d328482b4256c79f3e08db Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Aug 2023 01:49:26 +0000 Subject: [PATCH 21/22] support reopen stream state --- include/libs/stream/tstream.h | 10 +-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 84 +++++++++---------- source/libs/stream/src/streamMeta.c | 18 ++-- 4 files changed, 58 insertions(+), 58 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c0005ece7..16c47024ef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -369,11 +369,11 @@ typedef struct SStreamMeta { int32_t chkptNotReadyTasks; - int64_t checkpointId; - SArray* checkpointSaved; - SArray* checkpointInUse; - int32_t checkpointCap; - SRWLatch checkpointDirLock; + int64_t chkpId; + SArray* chkpSaved; + SArray* chkpInUse; + int32_t chkpCap; + SRWLatch chkpDirLock; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 87d174715d..5f77ea50e6 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -47,11 +47,11 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pReader->sver = sver; pReader->ever = ever; - int64_t checkpointId = meta ? meta->checkpointId : 0; + int64_t chkpId = meta ? meta->chkpId : 0; SStreamSnapReader* pSnapReader = NULL; - if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) { + if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 85abe203d4..14c94a7996 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -372,8 +372,8 @@ int32_t copyFiles(const char* src, const char* dst) { // opt later, just hard link int32_t sLen = strlen(src); int32_t dLen = strlen(dst); - char* absSrcPath = taosMemoryCalloc(1, sLen + 64); - char* absDstPath = taosMemoryCalloc(1, dLen + 64); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) return 0; @@ -383,22 +383,22 @@ int32_t copyFiles(const char* src, const char* dst) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); - sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); if (!taosDirEntryIsDir(de)) { - code = taosCopyFile(absSrcPath, absDstPath); + code = taosCopyFile(srcName, dstName); if (code == -1) { goto _err; } } - memset(absSrcPath, 0, sLen + 64); - memset(absDstPath, 0, dLen + 64); + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); } _err: - taosMemoryFreeClear(absSrcPath); - taosMemoryFreeClear(absDstPath); + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); taosCloseDir(&pDir); return code >= 0 ? 0 : -1; } @@ -626,75 +626,75 @@ void streamBackendHandleCleanup(void* arg) { int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); int64_t tc = 0; - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); if (sz <= 0) { - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); return -1; } else { - tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); + tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved); } - taosArrayPush(pMeta->checkpointInUse, &tc); + taosArrayPush(pMeta->chkpInUse, &tc); *checkpoint = tc; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); return 0; } /* * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| - * checkpointInUse: |--cp2--|--cp4--| - * checkpointInUse is doing translation, cannot del until + * chkpInUse: |--cp2--|--cp4--| + * chkpInUse is doing translation, cannot del until * replication is finished */ int32_t delObsoleteCheckpoint(void* arg, const char* path) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); - SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); - SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDel = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDup = taosArrayInit(10, sizeof(int64_t)); int64_t minId = 0; - if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) { - minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0); + if (taosArrayGetSize(pMeta->chkpInUse) >= 1) { + minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); - for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); if (id >= minId) { - taosArrayPush(checkpointDup, &id); + taosArrayPush(chkpDup, &id); } else { - taosArrayPush(checkpointDel, &id); + taosArrayPush(chkpDel, &id); } } } else { - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); - int32_t dsz = sz - pMeta->checkpointCap; // del size + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); + int32_t dsz = sz - pMeta->chkpCap; // del size for (int i = 0; i < dsz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDel, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDel, &id); } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDup, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDup, &id); } } - taosArrayDestroy(pMeta->checkpointSaved); - pMeta->checkpointSaved = checkpointDup; + taosArrayDestroy(pMeta->chkpSaved); + pMeta->chkpSaved = chkpDup; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); - for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { - int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); + for (int i = 0; i < taosArrayGetSize(chkpDel); i++) { + int64_t id = *(int64_t*)taosArrayGet(chkpDel, i); char tbuf[256] = {0}; sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } } - taosArrayDestroy(checkpointDel); + taosArrayDestroy(chkpDel); return 0; } @@ -742,7 +742,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { for (int i = 0; i < taosArrayGetSize(suffix); i++) { int64_t id = *(int64_t*)taosArrayGet(suffix, i); - taosArrayPush(pMeta->checkpointSaved, &id); + taosArrayPush(pMeta->chkpSaved, &id); } taosArrayDestroy(suffix); @@ -794,9 +794,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } rocksdb_checkpoint_object_destroy(cp); } - taosWLockLatch(&pMeta->checkpointDirLock); - taosArrayPush(pMeta->checkpointSaved, &checkpointId); - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); + taosArrayPush(pMeta->chkpSaved, &checkpointId); + taosWUnLockLatch(&pMeta->chkpDirLock); delObsoleteCheckpoint(arg, path); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 907ff2b48a..20a62963cc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -91,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointCap = 8; - taosInitRWLatch(&pMeta->checkpointDirLock); + pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpCap = 8; + taosInitRWLatch(&pMeta->chkpDirLock); int64_t chkpId = streamGetLatestCheckpointId(pMeta); + pMeta->chkpId = chkpId; pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); if (pMeta->streamBackend == NULL) { @@ -109,7 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TAOS_SYSTEM_ERROR(code); goto _err; } - taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); @@ -182,9 +182,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { taosHashClear(pMeta->pTaskBackendUnique); - taosArrayClear(pMeta->checkpointSaved); + taosArrayClear(pMeta->chkpSaved); - taosArrayClear(pMeta->checkpointInUse); + taosArrayClear(pMeta->chkpInUse); return 0; } @@ -222,8 +222,8 @@ void streamMetaClose(SStreamMeta* pMeta) { taosThreadMutexDestroy(&pMeta->backendMutex); taosHashCleanup(pMeta->pTaskBackendUnique); - taosArrayDestroy(pMeta->checkpointSaved); - taosArrayDestroy(pMeta->checkpointInUse); + taosArrayDestroy(pMeta->chkpSaved); + taosArrayDestroy(pMeta->chkpInUse); taosMemoryFree(pMeta); } From 100d2240c35f5e68d47d4e21cc3c5e8075c28fda Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Aug 2023 06:35:25 +0000 Subject: [PATCH 22/22] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 14c94a7996..69ba44b612 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -28,14 +28,17 @@ typedef struct { int64_t preCkptId; int64_t curChkpId; char* path; + char* buf; int32_t len; + // ping-pong buf SHashObj* pSstTbl[2]; - SArray* pAdd; - SArray* pDel; int8_t idx; - int8_t update; + + SArray* pAdd; + SArray* pDel; + int8_t update; } SBackendManager; typedef struct SCompactFilteFactory { @@ -258,6 +261,15 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; } else { int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + if (code != 0) { + // dead code + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + taosHashClear(bm->pSstTbl[1 - bm->idx]); + bm->update = 0; + + return code; + } bm->preCkptId = bm->curChkpId; bm->curChkpId = chkpId; @@ -714,11 +726,16 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { if (!taosDirExist(chkpPath)) { // no checkpoint, nothing to load + taosMemoryFree(chkpPath); return 0; } TdDirPtr pDir = taosOpenDir(chkpPath); - if (pDir == NULL) return 0; + + if (pDir == NULL) { + taosMemoryFree(chkpPath); + return 0; + } TdDirEntryPtr de = NULL; SArray* suffix = taosArrayInit(4, sizeof(int64_t));