diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index eb4eed8b6d..d955fad150 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1724,8 +1724,15 @@ end: return code; } +typedef enum { + WRITE_RAW_INIT_START = 0, + WRITE_RAW_INIT_OK, + WRITE_RAW_INIT_FAIL, +}; + static SHashObj* writeRawCache = NULL; static int8_t initFlag = 0; +static int8_t initedFlag = WRITE_RAW_INIT_START; typedef struct{ SHashObj* pVgHash; @@ -2276,15 +2283,31 @@ void tmq_free_raw(tmq_raw_data raw) { (void)memset(terrMsg, 0, ERR_MSG_LEN); } -static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { - int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1); - if (old == 0) { - int32_t code = initRawCacheHash(); - if (code != 0) { - return code; +static int32_t writeRawInit(){ + while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_OK) { + int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1); + if (old == 0) { + int32_t code = initRawCacheHash(); + if (code != 0) { + uError("tmq writeRawImpl init error:%d", code); + atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL); + return code; + } + atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK); } } + if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL){ + return TSDB_CODE_INTERNAL_ERROR; + } + return 0; +} + +static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { + if (writeRawInit() != 0) { + return TSDB_CODE_INTERNAL_ERROR; + } + if (type == TDMT_VND_CREATE_STB) { return taosCreateStb(taos, buf, len); } else if (type == TDMT_VND_ALTER_STB) {