From 865183cda842b9cdad29e8b1b83650de536c6f43 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 26 Sep 2024 17:00:35 +0800 Subject: [PATCH] enh:[TS-5441] cost too long in tmq write meta data --- source/client/src/clientRawBlockWrite.c | 100 +++++++++++++----------- 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 48049f0baf..fd6c3a2162 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1780,6 +1780,42 @@ end: return code; } +static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) { + // find schema data info + int32_t code = 0; + SVCreateTbReq pCreateReq = {0}; + SDecoder decoderTmp = {0}; + + for (int j = 0; j < rsp->createTableNum; j++) { + void** dataTmp = taosArrayGet(rsp->createTableReq, j); + RAW_NULL_CHECK(dataTmp); + int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j); + RAW_NULL_CHECK(dataTmp); + + tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); + RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); + + if (pCreateReq.type != TSDB_CHILD_TABLE) { + code = TSDB_CODE_INVALID_MSG; + goto end; + } + if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){ + RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); + } else{ + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); + pCreateReq = (SVCreateTbReq){0}; + } + + tDecoderClear(&decoderTmp); + } + return 0; + +end: + tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); + return code; +} + static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); @@ -1791,7 +1827,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; - SVCreateTbReq* pCreateReqDst = NULL; + SHashObj* pCreateTbHash = NULL; SRequestObj* pRequest = NULL; RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); @@ -1832,6 +1868,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) RAW_RETURN_CHECK(smlInitHandle(&pQuery)); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); RAW_NULL_CHECK(pVgHash); + pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pCreateTbHash); + RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash)); uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum); while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { @@ -1854,40 +1893,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) (void)strcpy(pName.tname, tbName); // find schema data info - for (int j = 0; j < rspObj.rsp.createTableNum; j++) { - void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); - RAW_NULL_CHECK(dataTmp); - int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); - RAW_NULL_CHECK(dataTmp); - - SDecoder decoderTmp = {0}; - SVCreateTbReq pCreateReq = {0}; - tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { - tDecoderClear(&decoderTmp); - tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - code = TSDB_CODE_TMQ_INVALID_MSG; - SET_ERROR_MSG("decode create table:%s req failed", tbName); - goto end; - } - - if (pCreateReq.type != TSDB_CHILD_TABLE) { - code = TSDB_CODE_TSC_INVALID_VALUE; - tDecoderClear(&decoderTmp); - tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - SET_ERROR_MSG("create table req type is not child table: %s, type: %d", tbName, pCreateReq.type); - goto end; - } - if (strcmp(tbName, pCreateReq.name) == 0) { - RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, &pCreateReqDst)); - tDecoderClear(&decoderTmp); - tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - break; - } - tDecoderClear(&decoderTmp); - tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - } - + SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); SVgroupInfo vg = {0}; RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); if (pCreateReqDst) { // change stable name to get meta @@ -1920,13 +1926,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } void* rawData = getRawDataFromRes(pRetrieve); char err[ERR_MSG_LEN] = {0}; - code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); + SVCreateTbReq* pCreateReqTmp = NULL; + if (pCreateReqDst){ + RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp)); + } + code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN); + if (pCreateReqTmp != NULL) { + tdDestroySVCreateTbReq(pCreateReqTmp); + taosMemoryFree(pCreateReqTmp); + } taosMemoryFree(fields); taosMemoryFreeClear(pTableMeta); - if (pCreateReqDst) { - tdDestroySVCreateTbReq(pCreateReqDst); - taosMemoryFreeClear(pCreateReqDst); - } if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -1940,16 +1950,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) end: uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + void* pIter = taosHashIterate(pCreateTbHash, NULL); + while (pIter) { + tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); + pIter = taosHashIterate(pCreateTbHash, pIter); + } + taosHashCleanup(pCreateTbHash); tDeleteSTaosxRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); - if (pCreateReqDst) { - tdDestroySVCreateTbReq(pCreateReqDst); - taosMemoryFreeClear(pCreateReqDst); - } return code; }