fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-02-17 15:32:18 +08:00
parent 3da00b7c74
commit 420c222d5b
5 changed files with 7 additions and 12 deletions

View File

@ -11513,7 +11513,7 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
if (pRsp->blockDataElementFree){
taosArrayDestroyP(pRsp->blockData)
taosArrayDestroyP(pRsp->blockData, NULL);
} else {
taosArrayDestroy(pRsp->blockData);
}

View File

@ -849,11 +849,6 @@ static int32_t physiDispatchCopy(const SDataDispatcherNode* pSrc, SDataDispatche
return TSDB_CODE_SUCCESS;
}
static int32_t physiInserterCopy(const SDataInserterNode* pSrc, SDataInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
return TSDB_CODE_SUCCESS;
}
static int32_t physiQueryInserterCopy(const SQueryInserterNode* pSrc, SQueryInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
CLONE_NODE_LIST_FIELD(pCols);
@ -1135,9 +1130,6 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = physiDispatchCopy((const SDataDispatcherNode*)pNode, (SDataDispatcherNode*)pDst);
break;
//case QUERY_NODE_PHYSICAL_PLAN_INSERT:
// code = physiInserterCopy((const SDataInserterNode*)pNode, (SDataInserterNode*)pDst);
// break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
code = physiQueryInserterCopy((const SQueryInserterNode*)pNode, (SQueryInserterNode*)pDst);
break;

View File

@ -1995,6 +1995,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_INSERT: {
SDataInserterNode* pSink = (SDataInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
taosMemFreeClear(pSink->pData);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {

View File

@ -830,6 +830,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
}
static void destroyVgDataBlocks(void* p) {
if (p == NULL) return;
SVgDataBlocks* pVg = p;
taosMemoryFree(pVg->pData);
taosMemoryFree(pVg);
@ -855,7 +856,6 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
@ -863,6 +863,9 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
}
if (TSDB_CODE_SUCCESS != code) {
destroyVgDataBlocks(dst);
}
}
if (append) {

View File

@ -155,7 +155,7 @@ void rpcCloseImpl(void* arg) {
void* rpcMallocCont(int64_t contLen) {
int64_t size = contLen + TRANS_MSG_OVERHEAD;
char* start = taosMemoryMalloc(size);
char* start = taosMemoryCalloc(1, size);
if (start == NULL) {
tError("failed to malloc msg, size:%" PRId64, size);
return NULL;
@ -163,7 +163,6 @@ void* rpcMallocCont(int64_t contLen) {
tTrace("malloc mem:%p size:%" PRId64, start, size);
}
memset(start, 0, TRANS_MSG_OVERHEAD);
return start + TRANS_MSG_OVERHEAD;
}