Merge remote-tracking branch 'origin/enh/refactorBackend' into enh/refactorBackend

# Conflicts:
#	include/libs/stream/tstream.h
This commit is contained in:
Haojun Liao 2023-11-08 15:21:21 +08:00
commit 470f244032
4 changed files with 117 additions and 108 deletions

View File

@ -485,6 +485,7 @@ typedef struct SStreamMeta {
SArray* chkpInUse; SArray* chkpInUse;
SRWLatch chkpDirLock; SRWLatch chkpDirLock;
void* qHandle;
int32_t pauseTaskNum; int32_t pauseTaskNum;
} SStreamMeta; } SStreamMeta;

View File

@ -79,6 +79,53 @@ typedef struct {
} STaskDbWrapper; } STaskDbWrapper;
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
TdThreadRwlock rwLock;
} SDbChkp;
typedef struct {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
SHashObj* pDbChkpTbl;
TdThreadRwlock rwLock;
} SBkdMgt;
void* streamBackendInit(const char* path, int64_t chkpId); void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg); void streamBackendHandleCleanup(void* arg);
@ -194,4 +241,15 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
void taskDbDestroy(void* pDb);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
#endif #endif

View File

@ -20,53 +20,6 @@
#include "tcommon.h" #include "tcommon.h"
#include "tref.h" #include "tref.h"
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
TdThreadRwlock rwLock;
} SDbChkp;
typedef struct {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
SHashObj* pDbChkpTbl;
TdThreadRwlock rwLock;
} SBkdMgt;
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
void* status; void* status;
} SCompactFilteFactory; } SCompactFilteFactory;
@ -223,8 +176,6 @@ SCfInit ginitDict[] = {
valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName},
}; };
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
int32_t getCfIdx(const char* cfName) { int32_t getCfIdx(const char* cfName) {
int idx = -1; int idx = -1;
size_t len = strlen(cfName); size_t len = strlen(cfName);
@ -868,7 +819,6 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb); taskDbRemoveRef(pTaskDb);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
SStreamTask* pTask = pTaskDb->pTask; SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId, SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
@ -876,6 +826,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
.chkpId = pTaskDb->chkpId, .chkpId = pTaskDb->chkpId,
.dbPrefixPath = taosStrdup(pTaskDb->path)}; .dbPrefixPath = taosStrdup(pTaskDb->path)};
taosArrayPush(pSnap, &snap); taosArrayPush(pSnap, &snap);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
} }
return code; return code;
} }

View File

@ -18,6 +18,7 @@
#include "streamInt.h" #include "streamInt.h"
#include "tmisce.h" #include "tmisce.h"
#include "tref.h" #include "tref.h"
#include "tsched.h"
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
@ -320,8 +321,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->stage = stage; pMeta->stage = stage;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta); // pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
@ -350,7 +349,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0; pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
return pMeta; return pMeta;
_err: _err:
@ -1143,7 +1142,9 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1,}; SRpcMsg msg = {
.info.noResp = 1,
};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1; pMeta->pHbInfo->hbCount += 1;
@ -1250,7 +1251,6 @@ void streamMetaRLock(SStreamMeta* pMeta) {
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);
@ -1260,4 +1260,3 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }