fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-02-06 16:48:55 +08:00
parent b5edb79d48
commit 9bde3cab92
7 changed files with 18 additions and 19 deletions

View File

@ -1986,7 +1986,6 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_INSERT: {
SDataInserterNode* pSink = (SDataInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
taosMemoryFreeClear(pSink->pData);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {

View File

@ -2839,7 +2839,8 @@ static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlock
pInserter->numOfTables = pBlocks->numOfTables;
pInserter->size = pBlocks->size;
TSWAP(pInserter->pData, pBlocks->pData);
pInserter->pData = pBlocks->pData;
pBlocks->pData = NULL;
*pSink = (SDataSinkNode*)pInserter;
return TSDB_CODE_SUCCESS;

View File

@ -1110,7 +1110,7 @@ _return:
}
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) {
int32_t msgSize = 0;
int32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
@ -1136,13 +1136,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_SUBMIT:
case TDMT_VND_COMMIT: {
msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(terrno);
}
TAOS_MEMCPY(msg, pTask->msg, msgSize);
msg = pTask->msg;
pTask->msg = NULL;
break;
}

View File

@ -155,7 +155,7 @@ void rpcCloseImpl(void* arg) {
void* rpcMallocCont(int64_t contLen) {
int64_t size = contLen + TRANS_MSG_OVERHEAD;
char* start = taosMemoryCalloc(1, size);
char* start = taosMemoryMalloc(size);
if (start == NULL) {
tError("failed to malloc msg, size:%" PRId64, size);
return NULL;
@ -163,7 +163,8 @@ void* rpcMallocCont(int64_t contLen) {
tTrace("malloc mem:%p size:%" PRId64, start, size);
}
return start + sizeof(STransMsgHead);
memset(start, 0, TRANS_MSG_OVERHEAD);
return start + TRANS_MSG_OVERHEAD;
}
void rpcFreeCont(void* cont) { transFreeMsg(cont); }

View File

@ -61,7 +61,8 @@ ignoreCodes = [
'0x80003107', '0x80003108', '0x80003109', '0x80003110', '0x80003111', '0x80003112', '0x80003250', '0x80004003', '0x80004004', '0x80004005',
'0x80004006', '0x80004007', '0x80004008', '0x80004009', '0x80004010', '0x80004011', '0x80004012', '0x80004013', '0x80004014', '0x80004015',
'0x80004016', '0x80004102', '0x80004103', '0x80004104', '0x80004105', '0x80004106', '0x80004107', '0x80004108', '0x80004109', '0x80005100',
'0x80005101', '0x80006000', '0x80006100', '0x80006101', '0x80006102', '0x80000019', '0x80002639', '0x80002666', '0x80000237']
'0x80005101', '0x80006000', '0x80006100', '0x80006101', '0x80006102', '0x80000019', '0x80002639', '0x80002666', '0x80000237', '0x80004018',
'0x80004019']
class TDTestCase(TBase):

View File

@ -53,7 +53,7 @@
"columns": [
{"type": "TINYINT", "name": "current", "max": 128, "min": 1 },
{ "type": "BOOL", "name": "phaseewe" },
{ "type": "BINARY", "name": "locatin", "len":16374 },
{ "type": "BINARY", "name": "str", "len":1024 },
{ "type": "BIGINT", "name": "cnt", "max" : 2563332323232, "min":1 },
{ "type": "DOUBLE", "name": "phase", "max": 1000, "min": 0 }
],

View File

@ -28,7 +28,7 @@ if __name__ == "__main__":
if not os.path.isdir("taosx-perf"):
os.system("mkdir taosx-perf")
os.system("git clone https://github.com/brendangregg/FlameGraph.git taosx-perf")
# os.system("git clone https://github.com/brendangregg/FlameGraph.git taosx-perf")
os.chdir("taosx-perf")
print(os.getcwd())
@ -43,6 +43,7 @@ if __name__ == "__main__":
tdDnodes2.deploy(1,updatecfgDict2)
tdDnodes2.start(1)
os.system("taos -c ./dnode1/sim/dnode1/cfg -s \"drop topic if exists test\"")
os.system("taos -c ./dnode2/sim/dnode1/cfg -s \"drop database if exists test\"")
os.system("taos -c ./dnode2/sim/dnode1/cfg -s \"create database test vgroups 8\"")
if insertData :
@ -51,12 +52,13 @@ if __name__ == "__main__":
print("create test in dst")
print("start to run taosx")
os.system("taosx run -f \"tmq://root:taosdata@localhost:6030/test?group.id=taosx-new-`date +%s`&timeout=50s&experimental.snapshot.enable=false&auto.offset.reset=earliest&prefer=raw\" -t \"taos://root:taosdata@localhost:7030/test\" > /dev/null 2>&1 &")
time.sleep(10)
os.system("flamegraph -o raw.svg -- taosx run -f \"tmq://root:taosdata@localhost:6030/test?group.id=taosx-new-`date +%s`&timeout=50s&experimental.snapshot.enable=false&auto.offset.reset=earliest&prefer=raw\" -t \"taos://root:taosdata@localhost:7030/test\" > /dev/null 2>&1 &")
# os.system("taosx run -f \"tmq://root:taosdata@localhost:6030/test?group.id=taosx-new-`date +%s`&timeout=50s&experimental.snapshot.enable=false&auto.offset.reset=earliest&prefer=raw\" -t \"taos://root:taosdata@localhost:7030/test\" > /dev/null 2>&1 &")
# time.sleep(10)
print("start to run perf")
# print("start to run perf")
#os.system("perf record -a -g -F 99 -p `pidof taosx` sleep 60")
#os.system("perf script | ./FlameGraph/stackcollapse-perf.pl| ./FlameGraph/flamegraph.pl > flame.svg")
#os.system("perf script | ./stackcollapse-perf.pl| ./flamegraph.pl > flame.svg")
tdLog.info("Procedures for tdengine deployed in")