enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-11 16:26:48 +08:00
parent 52f003e474
commit ddc3676752
1 changed files with 20 additions and 18 deletions

View File

@ -1821,7 +1821,6 @@ static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
int32_t code = 0; int32_t code = 0;
SVCreateTbReq pCreateReq = {0}; SVCreateTbReq pCreateReq = {0};
SDecoder decoderTmp = {0}; SDecoder decoderTmp = {0};
SVCreateTbReq *pCreateReqTmp = NULL;
for (int j = 0; j < rsp->createTableNum; j++) { for (int j = 0; j < rsp->createTableNum; j++) {
void** dataTmp = taosArrayGet(rsp->createTableReq, j); void** dataTmp = taosArrayGet(rsp->createTableReq, j);
@ -1837,9 +1836,7 @@ static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
goto end; goto end;
} }
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) { if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, &pCreateReqTmp)); RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), pCreateReqTmp, sizeof(SVCreateTbReq)));
pCreateReqTmp = NULL;
} else { } else {
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
pCreateReq = (SVCreateTbReq){0}; pCreateReq = (SVCreateTbReq){0};
@ -1852,12 +1849,11 @@ static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
end: end:
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
tDestroySVCreateTbReq(pCreateReqTmp, TSDB_MSG_FLG_DECODE);
return code; return code;
} }
static threadlocal SHashObj* pVgHash = NULL; static threadlocal SHashObj* pVgHash = NULL;
static threadlocal SHashObj* pCreateTbHash = NULL; //static threadlocal SHashObj* pCreateTbHash = NULL;
static threadlocal SHashObj* pNameHash = NULL; static threadlocal SHashObj* pNameHash = NULL;
static threadlocal SHashObj* pMetaHash = NULL; static threadlocal SHashObj* pMetaHash = NULL;
@ -1912,7 +1908,8 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SMqRspObj rspObj = {0}; SMqRspObj rspObj = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
SRequestObj* pRequest = NULL; SHashObj* pCreateTbHash = NULL;
SRequestObj* pRequest = NULL;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
@ -1947,7 +1944,12 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
conn.requestId = pRequest->requestId; conn.requestId = pRequest->requestId;
conn.requestObjRefId = pRequest->self; conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
if (type == RES_TYPE__TMQ_METADATA) {
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
}
int retry = 0; int retry = 0;
while(1){ while(1){
@ -2064,6 +2066,12 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
}else { }else {
tDeleteMqDataRsp(&rspObj.dataRsp); tDeleteMqDataRsp(&rspObj.dataRsp);
} }
void* pIter = taosHashIterate(pCreateTbHash, NULL);
while (pIter) {
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
pIter = taosHashIterate(pCreateTbHash, pIter);
}
taosHashCleanup(pCreateTbHash);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -2273,12 +2281,6 @@ static void tmqFreeMeta(void *data){
void freeHash() { void freeHash() {
taosHashCleanup(pMetaHash); taosHashCleanup(pMetaHash);
taosHashCleanup(pNameHash); taosHashCleanup(pNameHash);
void* pIter = taosHashIterate(pCreateTbHash, NULL);
while (pIter) {
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
pIter = taosHashIterate(pCreateTbHash, pIter);
}
taosHashCleanup(pCreateTbHash);
taosHashCleanup(pVgHash); taosHashCleanup(pVgHash);
} }
@ -2288,10 +2290,10 @@ static int32_t initHash(){
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgHash); RAW_NULL_CHECK(pVgHash);
} }
if (pCreateTbHash == NULL){ // if (pCreateTbHash == NULL){
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); // pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash); // RAW_NULL_CHECK(pCreateTbHash);
} // }
if (pNameHash == NULL){ if (pNameHash == NULL){
pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pNameHash); RAW_NULL_CHECK(pNameHash);