From 5399bffe993bc462938e44100bef90d5bf321a8d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 Sep 2024 18:47:02 +0800 Subject: [PATCH] enh:[TS-5441] cost too long in tmq write meta data --- source/client/src/clientRawBlockWrite.c | 188 ++++++++++++++---------- 1 file changed, 112 insertions(+), 76 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index a97440eed7..856f4f59c1 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -165,7 +165,7 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche } RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); -end: + end: *pJson = json; } @@ -199,7 +199,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { return code; } -end: + end: return code; } static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) { @@ -340,7 +340,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** break; } -end: + end: tFreeSMAltertbReq(&req); *pJson = json; } @@ -360,7 +360,7 @@ static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) { } buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson); -end: + end: uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDecoderClear(&coder); } @@ -380,7 +380,7 @@ static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) { } buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson); -end: + end: uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDecoderClear(&coder); } @@ -485,7 +485,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); } -end: + end: RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); taosArrayDestroy(pTagVals); } @@ -514,7 +514,7 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO } RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList)); -end: + end: *pJson = json; } @@ -542,7 +542,7 @@ static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) { } } -end: + end: uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDeleteSVCreateTbBatchReq(&req); tDecoderClear(&decoder); @@ -585,7 +585,7 @@ static void processAutoCreateTable(STaosxRsp* rsp, char** string) { *string = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); -end: + end: uDebug("auto created table return, sql json:%s", *string); for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); @@ -771,7 +771,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { break; } -end: + end: uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); *pJson = json; @@ -806,7 +806,7 @@ static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(tableName); RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); -end: + end: uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); *pJson = json; @@ -842,7 +842,7 @@ static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(sqlJson); RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson)); -end: + end: uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&coder); *pJson = json; @@ -879,7 +879,7 @@ static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) { } RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList)); -end: + end: uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); *pJson = json; @@ -990,7 +990,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); @@ -1027,9 +1027,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; SName pName = {0}; toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName); STableMeta* pTableMeta = NULL; @@ -1092,7 +1092,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1148,9 +1148,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); RAW_NULL_CHECK(pRequest->tableList); @@ -1243,7 +1243,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteSVCreateTbBatchReq(&req); @@ -1304,9 +1304,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); RAW_NULL_CHECK(pRequest->tableList); // loop to create table @@ -1371,7 +1371,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); @@ -1412,7 +1412,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); -end: + end: uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code)); tDecoderClear(&coder); return code; @@ -1455,9 +1455,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1525,7 +1525,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } -end: + end: uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code)); taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); @@ -1590,7 +1590,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); @@ -1650,7 +1650,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); @@ -1769,7 +1769,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); @@ -1790,23 +1790,28 @@ static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) { void** dataTmp = taosArrayGet(rsp->createTableReq, j); RAW_NULL_CHECK(dataTmp); int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j); - RAW_NULL_CHECK(dataTmp); + RAW_NULL_CHECK(lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); + RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); if (pCreateReq.type != TSDB_CHILD_TABLE) { code = TSDB_CODE_INVALID_MSG; goto end; } - if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){ - RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); - } else{ - tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - pCreateReq = (SVCreateTbReq){0}; + SVCreateTbReq** pCreateReqDst = taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)); + if (pCreateReqDst == NULL){ + RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, pCreateReqDst)); + code = taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReqDst, POINTER_BYTES); + if (code != 0){ + tdDestroySVCreateTbReq(*pCreateReqDst); + goto end; + } } tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); + pCreateReq = (SVCreateTbReq){0}; } return 0; @@ -1816,19 +1821,20 @@ end: return code; } +static threadlocal SHashObj* pVgHash = NULL; +static threadlocal SHashObj* pCreateTbHash = NULL; +static threadlocal SHashObj* pNameHash = NULL; +static threadlocal SHashObj* pMetaHash = NULL; + static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; - SHashObj* pVgHash = NULL; - SHashObj* pCreateTbHash = NULL; SQuery* pQuery = NULL; SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; - STableMeta* pTableMeta = NULL; - SRequestObj* pRequest = NULL; RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); @@ -1856,6 +1862,23 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } + if (pVgHash == NULL){ + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pVgHash); + } + if (pCreateTbHash == NULL){ + pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + RAW_NULL_CHECK(pCreateTbHash); + } + if (pNameHash == NULL){ + pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + RAW_NULL_CHECK(pNameHash); + } + if (pMetaHash == NULL){ + pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + RAW_NULL_CHECK(pMetaHash); + taosHashSetFreeFp(pMetaHash, taosMemoryFree); + } struct SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); @@ -1866,11 +1889,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); RAW_RETURN_CHECK(smlInitHandle(&pQuery)); - pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pVgHash); - pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pCreateTbHash); - RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash)); uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum); while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { @@ -1894,21 +1912,37 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) // find schema data info SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); - SVgroupInfo vg = {0}; - RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); - if (pCreateReqDst) { // change stable name to get meta - (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); + if (pCreateReqDst == NULL) { + RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash)); + pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); } - RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - if (pCreateReqDst) { - pTableMeta->vgId = vg.vgId; - pTableMeta->uid = pCreateReqDst->uid; - pCreateReqDst->ctb.suid = pTableMeta->suid; + int32_t vgId = 0; + SVgroupInfo* vg = (SVgroupInfo*)taosHashGet(pNameHash, tbName, strlen(tbName)); + if (vg == NULL) { + SVgroupInfo vgTmp = {0}; + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgTmp)); + RAW_RETURN_CHECK(taosHashPut(pNameHash, tbName, strlen(tbName), &vgTmp, sizeof(SVgroupInfo))); + code = taosHashPut(pVgHash, &vgTmp.vgId, sizeof(vgTmp.vgId), &vgTmp, sizeof(SVgroupInfo)); + code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; + RAW_RETURN_CHECK(code); + vgId = vgTmp.vgId; + } else { + vgId = vg->vgId; } - void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); - if (hData == NULL) { - RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); + + STableMeta** pTableMeta = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName)); + if (pTableMeta == NULL) { + if (pCreateReqDst) { // change stable name to get meta + (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); + } + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta)); + RAW_RETURN_CHECK(taosHashPut(pMetaHash, tbName, strlen(tbName), pTableMeta, POINTER_BYTES)); + if (pCreateReqDst) { + (*pTableMeta)->vgId = vgId; + (*pTableMeta)->uid = pCreateReqDst->uid; + pCreateReqDst->ctb.suid = (*pTableMeta)->suid; + } } SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter); @@ -1930,13 +1964,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) if (pCreateReqDst){ RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp)); } - code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN); + code = rawBlockBindData(pQuery, *pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN); if (pCreateReqTmp != NULL) { tdDestroySVCreateTbReq(pCreateReqTmp); taosMemoryFree(pCreateReqTmp); } taosMemoryFree(fields); - taosMemoryFreeClear(pTableMeta); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -1948,23 +1981,26 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + tDeleteSTaosxRsp(&rspObj.rsp); + tDecoderClear(&decoder); + qDestroyQuery(pQuery); + destroyRequest(pRequest); + + return code; +} + +void tmqClean() { + taosHashCleanup(pMetaHash); + taosHashCleanup(pNameHash); void* pIter = taosHashIterate(pCreateTbHash, NULL); while (pIter) { tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); pIter = taosHashIterate(pCreateTbHash, pIter); } taosHashCleanup(pCreateTbHash); - - tDeleteSTaosxRsp(&rspObj.rsp); - tDecoderClear(&decoder); - qDestroyQuery(pQuery); - destroyRequest(pRequest); taosHashCleanup(pVgHash); - taosMemoryFreeClear(pTableMeta); - - return code; } static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { @@ -2027,7 +2063,7 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { *string = fullStr; return; -end: + end: cJSON_Delete(pJson); tDeleteMqBatchMetaRsp(&rsp); } @@ -2113,7 +2149,7 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra raw->raw = buf; raw->raw_len = len; return code; -FAILED: + FAILED: tEncoderClear(&encoder); taosMemoryFree(buf); return code; @@ -2236,7 +2272,7 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen } } -end: + end: tDeleteMqBatchMetaRsp(&rsp); return code; }