enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-18 18:28:09 +08:00
parent 51c7f8b4f0
commit ccfe524636
1 changed files with 29 additions and 6 deletions

View File

@ -1724,8 +1724,15 @@ end:
return code;
}
typedef enum {
WRITE_RAW_INIT_START = 0,
WRITE_RAW_INIT_OK,
WRITE_RAW_INIT_FAIL,
};
static SHashObj* writeRawCache = NULL;
static int8_t initFlag = 0;
static int8_t initedFlag = WRITE_RAW_INIT_START;
typedef struct{
SHashObj* pVgHash;
@ -2276,15 +2283,31 @@ void tmq_free_raw(tmq_raw_data raw) {
(void)memset(terrMsg, 0, ERR_MSG_LEN);
}
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
if (old == 0) {
int32_t code = initRawCacheHash();
if (code != 0) {
return code;
static int32_t writeRawInit(){
while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_OK) {
int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
if (old == 0) {
int32_t code = initRawCacheHash();
if (code != 0) {
uError("tmq writeRawImpl init error:%d", code);
atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
return code;
}
atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
}
}
if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL){
return TSDB_CODE_INTERNAL_ERROR;
}
return 0;
}
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
if (writeRawInit() != 0) {
return TSDB_CODE_INTERNAL_ERROR;
}
if (type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, buf, len);
} else if (type == TDMT_VND_ALTER_STB) {