enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-16 10:19:40 +08:00
parent 10436cacff
commit 0002ede469
1 changed files with 70 additions and 46 deletions

View File

@ -1852,10 +1852,14 @@ end:
return code;
}
static threadlocal SHashObj* pVgHash = NULL;
//static threadlocal SHashObj* pCreateTbHash = NULL;
static threadlocal SHashObj* pNameHash = NULL;
static threadlocal SHashObj* pMetaHash = NULL;
static SHashObj* writeRawCache = NULL;
static int8_t initFlag = 0;
typedef struct{
SHashObj* pVgHash;
SHashObj* pNameHash;
SHashObj* pMetaHash;
}rawCacheInfo;
typedef struct{
SVgroupInfo vgInfo;
@ -1863,6 +1867,29 @@ typedef struct{
int64_t suid;
}tbInfo;
static void tmqFreeMeta(void *data){
STableMeta* pTableMeta = *(STableMeta**)data;
taosMemoryFree(pTableMeta);
}
static void freeRawCache(void *data) {
rawCacheInfo* pRawCache = (rawCacheInfo*)data;
taosHashCleanup(pRawCache->pMetaHash);
taosHashCleanup(pRawCache->pNameHash);
taosHashCleanup(pRawCache->pVgHash);
}
static int32_t initRawCacheHash(){
if (writeRawCache == NULL){
writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (writeRawCache == NULL){
return terrno;
}
taosHashSetFreeFp(writeRawCache, freeRawCache);
}
return 0;
}
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){
char* p = (char*)rawData;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
@ -1899,6 +1926,34 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
return false;
}
static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj **pMetaHash, void *key) {
int32_t code = 0;
void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
if (cacheInfo == NULL){
*pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pVgHash);
*pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pNameHash);
*pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pMetaHash);
taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
} else {
rawCacheInfo *info = (rawCacheInfo *)cacheInfo;
*pVgHash = info->pVgHash;
*pNameHash = info->pNameHash;
*pMetaHash = info->pMetaHash;
}
return 0;
end:
taosHashCleanup(*pMetaHash);
taosHashCleanup(*pNameHash);
taosHashCleanup(*pVgHash);
return code;
}
static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
@ -1951,6 +2006,10 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
}
SHashObj *pVgHash = NULL;
SHashObj *pNameHash = NULL;
SHashObj *pMetaHash = NULL;
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
int retry = 0;
while(1){
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
@ -1965,9 +2024,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
// int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter);
// RAW_NULL_CHECK(suid);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
(void)strcpy(pName.dbname, pRequest->pDb);
@ -2274,47 +2330,15 @@ void tmq_free_raw(tmq_raw_data raw) {
(void)memset(terrMsg, 0, ERR_MSG_LEN);
}
static void tmqFreeMeta(void *data){
STableMeta* pTableMeta = *(STableMeta**)data;
taosMemoryFree(pTableMeta);
}
void freeHash() {
taosHashCleanup(pMetaHash);
taosHashCleanup(pNameHash);
taosHashCleanup(pVgHash);
}
static int32_t initHash(){
int32_t code = 0;
if (pVgHash == NULL){
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgHash);
}
// if (pCreateTbHash == NULL){
// pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
// RAW_NULL_CHECK(pCreateTbHash);
// }
if (pNameHash == NULL){
pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pNameHash);
}
if (pMetaHash == NULL){
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pMetaHash);
taosHashSetFreeFp(pMetaHash, tmqFreeMeta);
}
return code;
end:
freeHash();
return code;
}
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
int32_t code = initHash();
if (code != 0) {
return code;
int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
if (old == 0) {
int32_t code = initRawCacheHash();
if (code != 0) {
return code;
}
}
if (type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, buf, len);
} else if (type == TDMT_VND_ALTER_STB) {