From 67fe6091298de92b1bb0876d09e985e2f74cbcbf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 8 Oct 2024 18:24:16 +0800 Subject: [PATCH] enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info --- include/libs/parser/parser.h | 4 +-- source/client/src/clientRawBlockWrite.c | 43 ++++++++++++------------- source/libs/parser/src/parInsertUtil.c | 21 +++++++++--- utils/test/c/tmq_taosx_ci.c | 1 + 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index c98b188d49..832e4f8863 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -176,8 +176,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int32_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); -int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* fields, - int numFields, bool needChangeLength, char* errstr, int32_t errstrLen); +int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields, + int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 17ebd9b558..064c3bdb2e 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1618,7 +1618,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat RAW_NULL_CHECK(pVgHash); RAW_RETURN_CHECK( taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); - RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0)); + RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1678,7 +1678,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha RAW_NULL_CHECK(pVgHash); RAW_RETURN_CHECK( taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); - RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0)); + RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1960,35 +1960,32 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da vgId = vg->vgId; } - STableMeta** pTableMeta = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName)); - if (pTableMeta == NULL) { + STableMeta* pTableMeta = NULL; + STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName)); + if (pTableMetaTmp == NULL) { if (pCreateReqDst) { // change stable name to get meta (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); } - RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta)); - RAW_RETURN_CHECK(taosHashPut(pMetaHash, tbName, strlen(tbName), pTableMeta, POINTER_BYTES)); - if (pCreateReqDst) { - (*pTableMeta)->vgId = vgId; - (*pTableMeta)->uid = pCreateReqDst->uid; - pCreateReqDst->ctb.suid = (*pTableMeta)->suid; + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); + code = taosHashPut(pMetaHash, tbName, strlen(tbName), &pTableMeta, POINTER_BYTES); + if (code != 0){ + taosMemoryFree(pTableMeta); + goto end; } + if (pCreateReqDst) { + pTableMeta->vgId = vgId; + pTableMeta->uid = pCreateReqDst->uid; + pCreateReqDst->ctb.suid = pTableMeta->suid; + } + }else{ + pTableMeta = *pTableMetaTmp; } -// SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); -// RAW_NULL_CHECK(pSW); -// TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); -// RAW_NULL_CHECK(fields); -// -// for (int i = 0; i < pSW->nCols; i++) { -// fields[i].type = pSW->pSchema[i].type; -// fields[i].bytes = pSW->pSchema[i].bytes; -// tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); -// } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); + RAW_NULL_CHECK(pSW); void* rawData = getRawDataFromRes(pRetrieve); char err[ERR_MSG_LEN] = {0}; - code = rawBlockBindData(pQuery, *pTableMeta, rawData, pCreateReqDst, NULL, 0, true, err, ERR_MSG_LEN); -// code = rawBlockBindData(pQuery, *pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); -// taosMemoryFree(fields); + code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 838f797394..f29ed79412 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -886,8 +886,8 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) { return false; } -int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, - int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) { +int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields, + int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) { int ret = 0; if(data == NULL) { uError("rawBlockBindData, data is NULL"); @@ -964,11 +964,16 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } if (tFields != NULL && numFields > boundInfo->numOfBound) { - if (errstr != NULL) - snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound); + if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound); ret = TSDB_CODE_INVALID_PARA; goto end; } + if (tFields == NULL && numOfCols != boundInfo->numOfBound) { + if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } + if (tFields == NULL) { for (int j = 0; j < boundInfo->numOfBound; j++) { SSchema* pColSchema = &pSchema[j]; @@ -1006,7 +1011,13 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate for (int i = 0; i < numFields; i++) { for (int j = 0; j < boundInfo->numOfBound; j++) { SSchema* pColSchema = &pSchema[j]; - if (strcmp(pColSchema->name, tFields[i].name) == 0) { + char* fieldName = NULL; + if (raw) { + fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name; + } else { + fieldName = ((TAOS_FIELD*)tFields)[i].name; + } + if (strcmp(pColSchema->name, fieldName) == 0) { if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { if (errstr != NULL) snprintf(errstr, errstrLen, diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 3a79a3763c..d49c7e4ad4 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -79,6 +79,7 @@ static void msg_process(TAOS_RES* msg) { } else { taosFprintfFile(g_fp, result); taosFprintfFile(g_fp, "\n"); + taosFsyncFile(g_fp); } } }