enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-18 09:28:16 +08:00
parent 3ae6f79290
commit bc05289192
8 changed files with 1 additions and 177 deletions

View File

@ -4113,7 +4113,6 @@ typedef struct {
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
// SArray* blockSuid;
union{
struct{

View File

@ -210,7 +210,6 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner);

View File

@ -1706,116 +1706,6 @@ static void* getRawDataFromRes(void* pRetrieve) {
return rawData;
}
//static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
// if (taos == NULL || data == NULL) {
// SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
// return TSDB_CODE_INVALID_PARA;
// }
// int32_t code = TSDB_CODE_SUCCESS;
// SHashObj* pVgHash = NULL;
// SQuery* pQuery = NULL;
// SMqRspObj rspObj = {0};
// SDecoder decoder = {0};
// STableMeta* pTableMeta = NULL;
//
// SRequestObj* pRequest = NULL;
// RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
//
// uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
// pRequest->syncQuery = true;
// rspObj.resIter = -1;
// rspObj.resType = RES_TYPE__TMQ;
//
// int8_t dataVersion = *(int8_t*)data;
// if (dataVersion >= MQ_DATA_RSP_VERSION) {
// data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
// dataLen -= sizeof(int8_t) + sizeof(int32_t);
// }
// tDecoderInit(&decoder, data, dataLen);
// code = tDecodeMqDataRsp(&decoder, &rspObj.dataRsp);
// if (code != 0) {
// SET_ERROR_MSG("decode mq data rsp failed");
// code = TSDB_CODE_INVALID_MSG;
// goto end;
// }
//
// if (!pRequest->pDb) {
// code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
// goto end;
// }
//
// struct SCatalog* pCatalog = NULL;
// RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
//
// SRequestConnInfo conn = {0};
// conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
// conn.requestId = pRequest->requestId;
// conn.requestObjRefId = pRequest->self;
// conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
//
// RAW_RETURN_CHECK(smlInitHandle(&pQuery));
// pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
// RAW_NULL_CHECK(pVgHash);
// while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
// void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
// RAW_NULL_CHECK(pRetrieve);
// if (!rspObj.dataRsp.withSchema) {
// goto end;
// }
//
// const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
// RAW_NULL_CHECK(tbName);
//
// SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
// (void)strcpy(pName.dbname, pRequest->pDb);
// (void)strcpy(pName.tname, tbName);
//
// RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
//
// SVgroupInfo vg = {0};
// RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
//
// void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
// if (hData == NULL) {
// RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
// }
//
// SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
// RAW_NULL_CHECK(pSW);
// TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
// RAW_NULL_CHECK(fields);
// for (int i = 0; i < pSW->nCols; i++) {
// fields[i].type = pSW->pSchema[i].type;
// fields[i].bytes = pSW->pSchema[i].bytes;
// tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
// }
// void* rawData = getRawDataFromRes(pRetrieve);
// char err[ERR_MSG_LEN] = {0};
// code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
// taosMemoryFree(fields);
// taosMemoryFreeClear(pTableMeta);
// if (code != TSDB_CODE_SUCCESS) {
// SET_ERROR_MSG("table:%s, err:%s", tbName, err);
// goto end;
// }
// }
//
// RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
//
// launchQueryImpl(pRequest, pQuery, true, NULL);
// code = pRequest->code;
//
// end:
// uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
// tDeleteMqDataRsp(&rspObj.dataRsp);
// tDecoderClear(&decoder);
// qDestroyQuery(pQuery);
// destroyRequest(pRequest);
// taosHashCleanup(pVgHash);
// taosMemoryFreeClear(pTableMeta);
// return code;
//}
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
// find schema data info
int32_t code = 0;

View File

@ -10980,42 +10980,13 @@ _exit:
return code;
}
//int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){
// for (int32_t i = 0; i < pRsp->blockNum; i++) {
// if (pRsp->withTbName) {
// int64_t* suid = taosArrayGet(pRsp->blockSuid, i);
// if (suid != NULL){
// TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid));
// }
// }
// }
// return 0;
//}
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
// TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp));
return 0;
}
//int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){
// if (!tDecodeIsEnd(pDecoder)) {
// if (pRsp->withTbName) {
// if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) {
// TAOS_CHECK_RETURN(terrno);
// }
// }
//
// for (int32_t i = 0; i < pRsp->blockNum; i++) {
// int64_t suid = 0;
// TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid));
// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) {
// TAOS_CHECK_RETURN(terrno);
// }
// }
// }
// return 0;
//}
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
@ -11092,9 +11063,6 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
}
// if (!tDecodeIsEnd(pDecoder)) {
// TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp));
// }
return 0;
}
@ -11108,8 +11076,6 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
// taosArrayDestroy(pRsp->blockSuid);
// pRsp->blockSuid = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
}
@ -11129,7 +11095,6 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
}
}
// TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp));
_exit:
return code;
@ -11161,9 +11126,6 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
}
}
// if (!tDecodeIsEnd(pDecoder)) {
// TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp));
// }
_exit:
return code;

View File

@ -77,16 +77,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
tqError("failed to push tbName to blockTbName:%s", tbName);
continue;
}
// int64_t suid = 0;
// if(mr.me.type == TSDB_CHILD_TABLE){
// suid = mr.me.ctbEntry.suid;
// }else{
// suid = mr.me.uid;
// }
// if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){
// tqError("failed to push suid to blockSuid:%"PRId64, suid);
// continue;
// }
}
metaReaderClear(&mr);
return 0;
@ -229,11 +219,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
continue;
}
// int64_t suid = qExtractSuidFromTask(task);
// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){
// tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId);
// continue;
// }
}
if (pRsp->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));

View File

@ -50,7 +50,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
// pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t));
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
@ -74,10 +73,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->blockSchema = NULL;
}
// if (pRsp->blockSuid != NULL) {
// taosArrayDestroy(pRsp->blockSuid);
// pRsp->blockSuid = NULL;
// }
return terrno;
}

View File

@ -69,7 +69,6 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
// int64_t suid; // for tmq
} SStreamTaskInfo;
struct SExecTaskInfo {

View File

@ -1212,11 +1212,6 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.tbName;
}
//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) {
// SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
// return pTaskInfo->streamInfo.suid;
//}
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return &pTaskInfo->streamInfo.btMetaRsp;