refactor code

This commit is contained in:
yihaoDeng 2023-08-28 16:13:27 +08:00
parent 9bef684677
commit 1687f68156
1 changed files with 39 additions and 25 deletions

View File

@ -31,18 +31,28 @@ int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0; int32_t streamMetaId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask);
typedef struct { typedef struct {
TdThreadMutex mutex; TdThreadMutex mutex;
SHashObj* pTable; SHashObj* pTable;
} SGStreamMetaMgt; } SMetaRefMgt;
SGStreamMetaMgt gStreamMetaMgt; SMetaRefMgt gMetaRefMgt;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId); void metaRefMgtInit();
static void streamMetaClear(SStreamMeta* pMeta); void metaRefMgtCleanup();
static int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaCloseImpl(void* arg);
static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); void metaRefMgtInit() {
taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
}
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
@ -50,8 +60,7 @@ static void streamMetaEnvInit() {
streamMetaId = taosOpenRef(64, streamMetaCloseImpl); streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
taosThreadMutexInit(&(gStreamMetaMgt.mutex), NULL); metaRefMgtInit();
gStreamMetaMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
} }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
@ -60,9 +69,11 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId); taosCloseRef(streamMetaId);
taosThreadMutexDestroy(&gStreamMetaMgt.mutex); metaRefMgtCleanup();
}
void* pIter = taosHashIterate(gStreamMetaMgt.pTable, NULL); void metaRefMgtCleanup() {
void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
while (pIter) { while (pIter) {
SArray* list = *(SArray**)pIter; SArray* list = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(list); i++) { for (int i = 0; i < taosArrayGetSize(list); i++) {
@ -70,23 +81,25 @@ void streamMetaCleanup() {
taosMemoryFree(rid); taosMemoryFree(rid);
} }
taosArrayDestroy(list); taosArrayDestroy(list);
pIter = taosHashIterate(gStreamMetaMgt.pTable, pIter); pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
} }
taosHashCleanup(gStreamMetaMgt.pTable); taosHashCleanup(gMetaRefMgt.pTable);
taosThreadMutexDestroy(&gMetaRefMgt.mutex);
} }
int32_t streamMetaAddRidToGlobalMgt(int64_t vgId, int64_t* rid) { int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
taosThreadMutexLock(&gStreamMetaMgt.mutex); taosThreadMutexLock(&gMetaRefMgt.mutex);
void* p = taosHashGet(gStreamMetaMgt.pTable, &vgId, sizeof(vgId)); void* p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
if (p == NULL) { if (p == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*)); SArray* list = taosArrayInit(8, sizeof(void*));
taosArrayPush(list, &rid); taosArrayPush(list, &rid);
taosHashPut(gStreamMetaMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
} else { } else {
SArray* list = *(SArray**)p; SArray* list = *(SArray**)p;
taosArrayPush(list, &rid); taosArrayPush(list, &rid);
} }
taosThreadMutexUnlock(&gStreamMetaMgt.mutex); taosThreadMutexUnlock(&gMetaRefMgt.mutex);
return 0; return 0;
} }
@ -143,7 +156,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
*pRid = pMeta->rid; *pRid = pMeta->rid;
streamMetaAddRidToGlobalMgt(pMeta->vgId, pRid); metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.tickCounter = 0;
@ -333,7 +346,8 @@ void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) {
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) {
int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn); int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) { if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], tstrerror(terrno)); qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1],
tstrerror(terrno));
} else { } else {
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]); qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]);
} }
@ -728,9 +742,9 @@ void metaHbToMnode(void* param, void* tmrId) {
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId}; int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if ((*pTask)->info.fillHistory == 1) { if ((*pTask)->info.fillHistory == 1) {
continue; continue;
@ -834,7 +848,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100); taosMsleep(100);
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
} }