sync update cache

This commit is contained in:
dapan1121 2022-06-06 13:39:34 +08:00
parent 25e75ccb34
commit 73d4e08ee6
3 changed files with 64 additions and 74 deletions

View File

@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation {
int32_t opId; int32_t opId;
void *data; void *data;
bool syncOp; bool syncOp;
uint64_t seqId; tsem_t rspSem;
} SCtgCacheOperation; } SCtgCacheOperation;
typedef struct SCtgQNode { typedef struct SCtgQNode {
SCtgCacheOperation op; SCtgCacheOperation *op;
struct SCtgQNode *next; struct SCtgQNode *next;
} SCtgQNode; } SCtgQNode;
typedef struct SCtgQueue { typedef struct SCtgQueue {
SRWLatch qlock; SRWLatch qlock;
uint64_t seqId;
uint64_t seqDone;
SCtgQNode *head; SCtgQNode *head;
SCtgQNode *tail; SCtgQNode *tail;
tsem_t reqSem; tsem_t reqSem;
tsem_t rspSem;
uint64_t qRemainNum; uint64_t qRemainNum;
} SCtgQueue; } SCtgQueue;

View File

@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
} }
if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode)); gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == gCtgMgmt.queue.head) { if (NULL == gCtgMgmt.queue.head) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
@ -1191,18 +1186,14 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true); atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (tsem_post(&gCtgMgmt.queue.rspSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1); taosUsleep(1);
} }
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
SCatalog *pCtg = NULL; SCatalog *pCtg = NULL;

View File

@ -501,25 +501,6 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void ctgWaitOpDone(SCtgCacheOperation *action) {
while (true) {
tsem_wait(&gCtgMgmt.queue.rspSem);
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
break;
}
if (gCtgMgmt.queue.seqDone >= action->seqId) {
break;
}
tsem_post(&gCtgMgmt.queue.rspSem);
sched_yield();
}
}
void ctgDequeue(SCtgCacheOperation **op) { void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head; SCtgQNode *orig = gCtgMgmt.queue.head;
@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
taosMemoryFreeClear(orig); taosMemoryFreeClear(orig);
*op = &node->op; *op = node->op;
} }
@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
node->op = *operation; node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail->next = node;
@ -558,7 +541,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (operation->syncOp) { if (operation->syncOp) {
ctgWaitOpDone(operation); tsem_wait(&operation->rspSem);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -567,7 +550,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE;
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
@ -583,21 +568,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP;
op->syncOp = syncOp;
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
@ -612,15 +600,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
msg->pCtg = pCtg; msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
@ -628,7 +616,10 @@ _return:
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META;
op->syncOp = syncOp;
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
@ -641,15 +632,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
msg->dbId = dbId; msg->dbId = dbId;
msg->suid = suid; msg->suid = suid;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
@ -657,7 +648,10 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META;
op->syncOp = syncOp;
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
@ -669,21 +663,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
strncpy(msg->tbName, tbName, sizeof(msg->tbName)); strncpy(msg->tbName, tbName, sizeof(msg->tbName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP;
op->syncOp = syncOp;
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
@ -701,22 +698,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
msg->dbId = dbId; msg->dbId = dbId;
msg->dbInfo = dbInfo; msg->dbInfo = dbInfo;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
@ -731,9 +731,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->output = output; msg->output = output;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -746,7 +746,9 @@ _return:
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET;
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
@ -758,9 +760,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
msg->vgId = vgId; msg->vgId = vgId;
msg->epSet = *pEpSet; msg->epSet = *pEpSet;
operation.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &operation)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -775,7 +777,10 @@ _return:
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER;
op->syncOp = syncOp;
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
@ -785,9 +790,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->userAuth = *pAuth; msg->userAuth = *pAuth;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1622,7 +1627,6 @@ void* ctgUpdateThreadFunc(void* param) {
} }
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
break; break;
} }
@ -1634,10 +1638,8 @@ void* ctgUpdateThreadFunc(void* param) {
(*gCtgCacheOperation[operation->opId].func)(operation); (*gCtgCacheOperation[operation->opId].func)(operation);
gCtgMgmt.queue.seqDone = operation->seqId;
if (operation->syncOp) { if (operation->syncOp) {
tsem_post(&gCtgMgmt.queue.rspSem); tsem_post(&operation->rspSem);
} }
CTG_RT_STAT_INC(qDoneNum, 1); CTG_RT_STAT_INC(qDoneNum, 1);