diff --git a/include/common/tmsg.h b/include/common/tmsg.h index eb7a08357b..9058dfe53f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4113,7 +4113,6 @@ typedef struct { SArray* blockData; SArray* blockTbName; SArray* blockSchema; -// SArray* blockSuid; union{ struct{ diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 8501d88be0..6d4b3df041 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,7 +210,6 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); -//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo); void* qExtractReaderFromStreamScanner(void* scanner); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 6c54c3aa69..206bc63d19 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1706,116 +1706,6 @@ static void* getRawDataFromRes(void* pRetrieve) { return rawData; } -//static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { -// if (taos == NULL || data == NULL) { -// SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); -// return TSDB_CODE_INVALID_PARA; -// } -// int32_t code = TSDB_CODE_SUCCESS; -// SHashObj* pVgHash = NULL; -// SQuery* pQuery = NULL; -// SMqRspObj rspObj = {0}; -// SDecoder decoder = {0}; -// STableMeta* pTableMeta = NULL; -// -// SRequestObj* pRequest = NULL; -// RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); -// -// uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); -// pRequest->syncQuery = true; -// rspObj.resIter = -1; -// rspObj.resType = RES_TYPE__TMQ; -// -// int8_t dataVersion = *(int8_t*)data; -// if (dataVersion >= MQ_DATA_RSP_VERSION) { -// data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); -// dataLen -= sizeof(int8_t) + sizeof(int32_t); -// } -// tDecoderInit(&decoder, data, dataLen); -// code = tDecodeMqDataRsp(&decoder, &rspObj.dataRsp); -// if (code != 0) { -// SET_ERROR_MSG("decode mq data rsp failed"); -// code = TSDB_CODE_INVALID_MSG; -// goto end; -// } -// -// if (!pRequest->pDb) { -// code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; -// goto end; -// } -// -// struct SCatalog* pCatalog = NULL; -// RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); -// -// SRequestConnInfo conn = {0}; -// conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; -// conn.requestId = pRequest->requestId; -// conn.requestObjRefId = pRequest->self; -// conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); -// -// RAW_RETURN_CHECK(smlInitHandle(&pQuery)); -// pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); -// RAW_NULL_CHECK(pVgHash); -// while (++rspObj.resIter < rspObj.dataRsp.blockNum) { -// void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); -// RAW_NULL_CHECK(pRetrieve); -// if (!rspObj.dataRsp.withSchema) { -// goto end; -// } -// -// const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); -// RAW_NULL_CHECK(tbName); -// -// SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; -// (void)strcpy(pName.dbname, pRequest->pDb); -// (void)strcpy(pName.tname, tbName); -// -// RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); -// -// SVgroupInfo vg = {0}; -// RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); -// -// void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); -// if (hData == NULL) { -// RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); -// } -// -// SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); -// RAW_NULL_CHECK(pSW); -// TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); -// RAW_NULL_CHECK(fields); -// for (int i = 0; i < pSW->nCols; i++) { -// fields[i].type = pSW->pSchema[i].type; -// fields[i].bytes = pSW->pSchema[i].bytes; -// tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); -// } -// void* rawData = getRawDataFromRes(pRetrieve); -// char err[ERR_MSG_LEN] = {0}; -// code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN); -// taosMemoryFree(fields); -// taosMemoryFreeClear(pTableMeta); -// if (code != TSDB_CODE_SUCCESS) { -// SET_ERROR_MSG("table:%s, err:%s", tbName, err); -// goto end; -// } -// } -// -// RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); -// -// launchQueryImpl(pRequest, pQuery, true, NULL); -// code = pRequest->code; -// -// end: -// uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code)); -// tDeleteMqDataRsp(&rspObj.dataRsp); -// tDecoderClear(&decoder); -// qDestroyQuery(pQuery); -// destroyRequest(pRequest); -// taosHashCleanup(pVgHash); -// taosMemoryFreeClear(pTableMeta); -// return code; -//} - static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) { // find schema data info int32_t code = 0; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e0009177e5..9c8544fcd4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10980,42 +10980,13 @@ _exit: return code; } -//int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){ -// for (int32_t i = 0; i < pRsp->blockNum; i++) { -// if (pRsp->withTbName) { -// int64_t* suid = taosArrayGet(pRsp->blockSuid, i); -// if (suid != NULL){ -// TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid)); -// } -// } -// } -// return 0; -//} int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime)); -// TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp)); return 0; } -//int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){ -// if (!tDecodeIsEnd(pDecoder)) { -// if (pRsp->withTbName) { -// if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) { -// TAOS_CHECK_RETURN(terrno); -// } -// } -// -// for (int32_t i = 0; i < pRsp->blockNum; i++) { -// int64_t suid = 0; -// TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid)); -// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) { -// TAOS_CHECK_RETURN(terrno); -// } -// } -// } -// return 0; -//} + int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { int32_t code = 0; int32_t lino; @@ -11092,9 +11063,6 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } -// if (!tDecodeIsEnd(pDecoder)) { -// TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp)); -// } return 0; } @@ -11108,8 +11076,6 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); pRsp->blockTbName = NULL; -// taosArrayDestroy(pRsp->blockSuid); -// pRsp->blockSuid = NULL; tOffsetDestroy(&pRsp->reqOffset); tOffsetDestroy(&pRsp->rspOffset); } @@ -11129,7 +11095,6 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); } } -// TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp)); _exit: return code; @@ -11161,9 +11126,6 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } -// if (!tDecodeIsEnd(pDecoder)) { -// TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp)); -// } _exit: return code; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3e4895378b..0e7e27fcff 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -77,16 +77,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i tqError("failed to push tbName to blockTbName:%s", tbName); continue; } -// int64_t suid = 0; -// if(mr.me.type == TSDB_CHILD_TABLE){ -// suid = mr.me.ctbEntry.suid; -// }else{ -// suid = mr.me.uid; -// } -// if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){ -// tqError("failed to push suid to blockSuid:%"PRId64, suid); -// continue; -// } } metaReaderClear(&mr); return 0; @@ -229,11 +219,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); continue; } -// int64_t suid = qExtractSuidFromTask(task); -// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){ -// tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId); -// continue; -// } } if (pRsp->withSchema) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6acfe6b074..3f11937463 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -50,7 +50,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); -// pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t)); if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { @@ -74,10 +73,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockSchema = NULL; } -// if (pRsp->blockSuid != NULL) { -// taosArrayDestroy(pRsp->blockSuid); -// pRsp->blockSuid = NULL; -// } return terrno; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index c9e65bacaf..e3bb9a1361 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -69,7 +69,6 @@ typedef struct { SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; SStreamState* pState; -// int64_t suid; // for tmq } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4cfa12be5b..b9e103ebca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1212,11 +1212,6 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.tbName; } -//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) { -// SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; -// return pTaskInfo->streamInfo.suid; -//} - SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return &pTaskInfo->streamInfo.btMetaRsp;