From e6bf8dcfde498c8e73f3ce723d23230366e88c78 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 9 Oct 2024 18:10:43 +0800 Subject: [PATCH] enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info --- include/common/tmsg.h | 23 +++++----- include/libs/executor/executor.h | 3 +- source/client/src/clientRawBlockWrite.c | 55 +++++++++++++++++++--- source/common/src/tmsg.c | 40 ++++++++++++++++ source/dnode/vnode/src/tq/tqScan.c | 61 ++++++++++++------------- source/dnode/vnode/src/tq/tqUtil.c | 10 +++- source/libs/executor/inc/querytask.h | 1 + source/libs/executor/src/executor.c | 6 +++ 8 files changed, 145 insertions(+), 54 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a10f02c96..fb118de907 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4049,18 +4049,17 @@ void tDeleteMqMetaRsp(SMqMetaRsp* pRsp); #define MQ_DATA_RSP_VERSION 100 typedef struct { - struct { - SMqRspHead head; - STqOffsetVal rspOffset; - STqOffsetVal reqOffset; - int32_t blockNum; - int8_t withTbName; - int8_t withSchema; - SArray* blockDataLen; - SArray* blockData; - SArray* blockTbName; - SArray* blockSchema; - }; + SMqRspHead head; + STqOffsetVal rspOffset; + STqOffsetVal reqOffset; + int32_t blockNum; + int8_t withTbName; + int8_t withSchema; + SArray* blockDataLen; + SArray* blockData; + SArray* blockTbName; + SArray* blockSchema; + SArray* blockSuid; union{ struct{ diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ae26d5f2ae..6b02d8f985 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -209,7 +209,8 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); -const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); +const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); +const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo); void* qExtractReaderFromStreamScanner(void* scanner); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 064c3bdb2e..4b2e9e234d 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1856,6 +1856,42 @@ static threadlocal SHashObj* pCreateTbHash = NULL; static threadlocal SHashObj* pNameHash = NULL; static threadlocal SHashObj* pMetaHash = NULL; +static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){ + char* p = (char*)rawData; + // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each + // column length | + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(uint64_t); + int8_t* fields = p; + + if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) { + return true; + } + for (int i = 0; i < pSW->nCols; i++) { + int j = 0; + for (; j < pTableMeta->tableInfo.numOfColumns; j++) { + SSchema* pColSchema = &pTableMeta->schema[j]; + char* fieldName = pSW->pSchema[i].name; + + if (strcmp(pColSchema->name, fieldName) == 0) { + if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + return true; + } + break; + } + } + fields += sizeof(int8_t) + sizeof(int32_t); + + if (j == pTableMeta->tableInfo.numOfColumns) + return true; + } + return false; +} + static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); @@ -1905,7 +1941,7 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da RAW_NULL_CHECK(pNameHash); } if (pMetaHash == NULL){ - pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); RAW_NULL_CHECK(pMetaHash); taosHashSetFreeFp(pMetaHash, taosMemoryFree); } @@ -1931,6 +1967,9 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); RAW_NULL_CHECK(tbName); + int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter); + RAW_NULL_CHECK(suid); + uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; (void)strcpy(pName.dbname, pRequest->pDb); @@ -1960,14 +1999,19 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da vgId = vg->vgId; } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); + RAW_NULL_CHECK(pSW); + void* rawData = getRawDataFromRes(pRetrieve); + RAW_NULL_CHECK(rawData); + STableMeta* pTableMeta = NULL; - STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName)); - if (pTableMetaTmp == NULL) { + STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, suid, POINTER_BYTES); + if (pTableMetaTmp == NULL || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { if (pCreateReqDst) { // change stable name to get meta (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); } RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - code = taosHashPut(pMetaHash, tbName, strlen(tbName), &pTableMeta, POINTER_BYTES); + code = taosHashPut(pMetaHash, suid, POINTER_BYTES, &pTableMeta, POINTER_BYTES); if (code != 0){ taosMemoryFree(pTableMeta); goto end; @@ -1981,9 +2025,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da pTableMeta = *pTableMetaTmp; } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); - RAW_NULL_CHECK(pSW); - void* rawData = getRawDataFromRes(pRetrieve); char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 986747fe58..41516d325a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10716,12 +10716,42 @@ _exit: return code; } +int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){ + for (int32_t i = 0; i < pRsp->blockNum; i++) { + if (pRsp->withTbName) { + int64_t* suid = taosArrayGet(pRsp->blockSuid, i); + if (suid != NULL){ + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid)); + } + } + } + return 0; +} int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime)); + TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp)); + return 0; } +int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){ + if (!tDecodeIsEnd(pDecoder)) { + if (pRsp->withTbName) { + if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) { + TAOS_CHECK_RETURN(terrno); + } + } + for (int32_t i = 0; i < pRsp->blockNum; i++) { + int64_t suid = 0; + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid)); + if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) { + TAOS_CHECK_RETURN(terrno); + } + } + } + return 0; +} int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { int32_t code = 0; int32_t lino; @@ -10798,6 +10828,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } + if (!tDecodeIsEnd(pDecoder)) { + TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp)); + } return 0; } @@ -10811,6 +10844,8 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); pRsp->blockTbName = NULL; + taosArrayDestroy(pRsp->blockSuid); + pRsp->blockSuid = NULL; tOffsetDestroy(&pRsp->reqOffset); tOffsetDestroy(&pRsp->rspOffset); } @@ -10830,6 +10865,8 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); } } + TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp)); + _exit: return code; } @@ -10860,6 +10897,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } + if (!tDecodeIsEnd(pDecoder)) { + TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp)); + } _exit: return code; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index dbc1b16cf5..a95437ab0d 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -77,6 +77,14 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i tqError("failed to push tbName to blockTbName:%s", tbName); continue; } + int64_t suid = 0; + if(mr.me.type == TSDB_CHILD_TABLE){ + suid = mr.me.ctbEntry.suid; + } + if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){ + tqError("failed to push suid to blockSuid:%"PRId64, suid); + continue; + } } metaReaderClear(&mr); return 0; @@ -210,36 +218,26 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat if (pDataBlock != NULL && pDataBlock->info.rows > 0) { if (pRsp->withTbName) { - if (pOffset->type == TMQ_OFFSET__LOG) { - int64_t uid = pExec->pTqReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { - tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); - continue; - } - } else { - char* tbName = taosStrdup(qExtractTbnameFromTask(task)); - if (tbName == NULL) { - tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId); - return terrno; - } - if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){ - tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); - continue; - } + char* tbName = taosStrdup(qExtractTbnameFromTask(task)); + if (tbName == NULL) { + tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId); + return terrno; + } + if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){ + tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); + continue; + } + int64_t suid = qExtractSuidFromTask(task); + if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){ + tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId); + continue; } } if (pRsp->withSchema) { - if (pOffset->type == TMQ_OFFSET__LOG) { - if (tqAddBlockSchemaToRsp(pExec, pRsp) != 0){ - tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); - continue; - } - } else { - SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); - if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ - tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); - continue; - } + SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); + if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ + tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); + continue; } } @@ -249,12 +247,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat continue; } pRsp->blockNum++; - if (pOffset->type == TMQ_OFFSET__LOG) { - continue; - } else { - rowCnt += pDataBlock->info.rows; - if (rowCnt <= tmqRowSize) continue; - } + rowCnt += pDataBlock->info.rows; + if (rowCnt <= tmqRowSize) continue; + } // get meta diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b4866b8c65..891e55d91d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -50,8 +50,11 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); + pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t)); - if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { + if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || + pRsp->blockTbName == NULL || pRsp->blockSchema == NULL || + pRsp->blockSuid == NULL) { if (pRsp->blockData != NULL) { taosArrayDestroy(pRsp->blockData); pRsp->blockData = NULL; @@ -71,6 +74,11 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { taosArrayDestroy(pRsp->blockSchema); pRsp->blockSchema = NULL; } + + if (pRsp->blockSuid != NULL) { + taosArrayDestroy(pRsp->blockSuid); + pRsp->blockSuid = NULL; + } return terrno; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index e3bb9a1361..3c52f8080e 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -69,6 +69,7 @@ typedef struct { SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; SStreamState* pState; + int64_t suid; // for tmq } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cbf392f67e..08b7ba0e05 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1212,6 +1212,11 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.tbName; } +const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.suid; +} + SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return &pTaskInfo->streamInfo.btMetaRsp; @@ -1494,6 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); + pTaskInfo->streamInfo.suid = mtInfo.suid; tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema;