diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 6764dbfea5..6c54c3aa69 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1852,10 +1852,14 @@ end: return code; } -static threadlocal SHashObj* pVgHash = NULL; -//static threadlocal SHashObj* pCreateTbHash = NULL; -static threadlocal SHashObj* pNameHash = NULL; -static threadlocal SHashObj* pMetaHash = NULL; +static SHashObj* writeRawCache = NULL; +static int8_t initFlag = 0; + +typedef struct{ + SHashObj* pVgHash; + SHashObj* pNameHash; + SHashObj* pMetaHash; +}rawCacheInfo; typedef struct{ SVgroupInfo vgInfo; @@ -1863,6 +1867,29 @@ typedef struct{ int64_t suid; }tbInfo; +static void tmqFreeMeta(void *data){ + STableMeta* pTableMeta = *(STableMeta**)data; + taosMemoryFree(pTableMeta); +} + +static void freeRawCache(void *data) { + rawCacheInfo* pRawCache = (rawCacheInfo*)data; + taosHashCleanup(pRawCache->pMetaHash); + taosHashCleanup(pRawCache->pNameHash); + taosHashCleanup(pRawCache->pVgHash); +} + +static int32_t initRawCacheHash(){ + if (writeRawCache == NULL){ + writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (writeRawCache == NULL){ + return terrno; + } + taosHashSetFreeFp(writeRawCache, freeRawCache); + } + return 0; +} + static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){ char* p = (char*)rawData; // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each @@ -1899,6 +1926,34 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe return false; } +static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj **pMetaHash, void *key) { + int32_t code = 0; + void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES); + if (cacheInfo == NULL){ + *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(*pVgHash); + *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + RAW_NULL_CHECK(*pNameHash); + *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(*pMetaHash); + taosHashSetFreeFp(*pMetaHash, tmqFreeMeta); + rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash}; + RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo))); + } else { + rawCacheInfo *info = (rawCacheInfo *)cacheInfo; + *pVgHash = info->pVgHash; + *pNameHash = info->pNameHash; + *pMetaHash = info->pMetaHash; + } + + return 0; +end: + taosHashCleanup(*pMetaHash); + taosHashCleanup(*pNameHash); + taosHashCleanup(*pVgHash); + return code; +} + static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); @@ -1951,6 +2006,10 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); } + SHashObj *pVgHash = NULL; + SHashObj *pNameHash = NULL; + SHashObj *pMetaHash = NULL; + RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos)); int retry = 0; while(1){ RAW_RETURN_CHECK(smlInitHandle(&pQuery)); @@ -1965,9 +2024,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); RAW_NULL_CHECK(tbName); -// int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter); -// RAW_NULL_CHECK(suid); - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; (void)strcpy(pName.dbname, pRequest->pDb); @@ -2274,47 +2330,15 @@ void tmq_free_raw(tmq_raw_data raw) { (void)memset(terrMsg, 0, ERR_MSG_LEN); } -static void tmqFreeMeta(void *data){ - STableMeta* pTableMeta = *(STableMeta**)data; - taosMemoryFree(pTableMeta); -} - -void freeHash() { - taosHashCleanup(pMetaHash); - taosHashCleanup(pNameHash); - taosHashCleanup(pVgHash); -} - -static int32_t initHash(){ - int32_t code = 0; - if (pVgHash == NULL){ - pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pVgHash); - } -// if (pCreateTbHash == NULL){ -// pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); -// RAW_NULL_CHECK(pCreateTbHash); -// } - if (pNameHash == NULL){ - pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pNameHash); - } - if (pMetaHash == NULL){ - pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pMetaHash); - taosHashSetFreeFp(pMetaHash, tmqFreeMeta); - } - return code; -end: - freeHash(); - return code; -} - static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { - int32_t code = initHash(); - if (code != 0) { - return code; + int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1); + if (old == 0) { + int32_t code = initRawCacheHash(); + if (code != 0) { + return code; + } } + if (type == TDMT_VND_CREATE_STB) { return taosCreateStb(taos, buf, len); } else if (type == TDMT_VND_ALTER_STB) {