diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 38609c2348..addb84046c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1648,6 +1648,15 @@ typedef struct { int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); +typedef struct { + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + int8_t igNotExists; +} SMDropCgroupReq; + +int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); +int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 677fbb43b4..cc333ae5c8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } +int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) { int32_t sqlLen = 0; int32_t astLen = 0; diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 5ee4d48f9a..c05ad46189 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -36,9 +36,9 @@ typedef struct SBnodeMgmt { // bmHandle.c SArray *bmGetMsgHandles(); -int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); -int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index d2e547078e..9ec445c69c 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -18,7 +18,7 @@ void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonBmInfo bmInfo = {0}; bmGetMonitorInfo(pMgmt, &bmInfo); dmGetMonitorSystemInfo(&bmInfo.sys); @@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonBmInfo(&bmInfo); return 0; } int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateBnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropBnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 648dc217dc..75e83d6547 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -41,18 +41,18 @@ typedef struct SMnodeMgmt { // mmFile.c int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed); +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); // mmInt.c -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); +int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); // mmHandle.c SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); @@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); +int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 0bab05f973..2aa1087770 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -104,7 +104,7 @@ _OVER: return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); @@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); + int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); for (int32_t i = 0; i < replica; ++i) { SReplica *pReplica = &pMgmt->replicas[i]; - if (pReq != NULL) { - pReplica = &pReq->replicas[i]; + if (pMsg != NULL) { + pReplica = &pMsg->replicas[i]; } len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8b2d8f8bad..2ce42d7a5f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { mndGetLoad(pMgmt->pMnode, &pInfo->load); } -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMmInfo mmInfo = {0}; mmGetMonitorInfo(pMgmt, &mmInfo); dmGetMonitorSystemInfo(&mmInfo.sys); @@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonMmInfo(&mmInfo); return 0; } -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); @@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; return 0; } int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateMnodeReq createReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropMnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDAlterMnodeReq alterReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { + if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index a24334550b..4f7fd4a1c0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre return 0; } -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { +int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { SMnodeOpt option = {0}; - if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) { + if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) { return -1; } @@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { } bool deployed = true; - if (mmWriteFile(pMgmt, pReq, deployed) != 0) { + if (mmWriteFile(pMgmt, pMsg, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; - pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; + pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.mgmt = pMgmt; bool deployed = false; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 9b9960cf3a..cf76d3a167 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); } -int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg qCfg = { diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index dd16eee643..9738fb0c45 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -39,7 +39,7 @@ typedef struct SQnodeMgmt { SArray *qmGetMsgHandles(); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); // qmWorker.c int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index ac0868d8ef..c4b1ab63e4 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -18,7 +18,7 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonQmInfo qmInfo = {0}; qmGetMonitorInfo(pMgmt, &qmInfo); dmGetMonitorSystemInfo(&qmInfo.sys); @@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonQmInfo(&qmInfo); return 0; } int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateQnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropQnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 68b6ef659e..fbf63dda43 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -40,7 +40,7 @@ typedef struct SSnodeMgmt { SArray *smGetMsgHandles(); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index d6ff73b132..bf1bb145b7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -18,7 +18,7 @@ void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonSmInfo smInfo = {0}; smGetMonitorInfo(pMgmt, &smInfo); dmGetMonitorSystemInfo(&smInfo.sys); @@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonSmInfo(&smInfo); return 0; } int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateSnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropSnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 022d6204ef..5ec33fe810 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); // vmHandle.c SArray *vmGetMsgHandles(); -int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 602922feeb..3da3d90ae1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVmInfo vmInfo = {0}; vmGetMonitorInfo(pMgmt, &vmInfo); dmGetMonitorSystemInfo(&vmInfo.sys); @@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonVmInfo(&vmInfo); return 0; } -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pMgmt, &vloads); @@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonVloadInfo(&vloads); return 0; } @@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW } int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; SCreateVnodeReq createReq = {0}; int32_t code = -1; char path[TSDB_FILENAME_LEN] = {0}; - if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -242,9 +241,8 @@ _OVER: } int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; SDropVnodeReq dropReq = {0}; - if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b4c8a0807c..ee9008b198 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { return 0; } -static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { +static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); SEpSet epSet = {0}; dmGetMnodeEpSet(&pDnode->data, &epSet); - dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); + dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { @@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { SRpcMsg rsp = { .code = TSDB_CODE_RPC_REDIRECT, - .info = pReq->info, + .info = pMsg->info, .contLen = len, }; rsp.pCont = rpcMallocCont(len); tSerializeSMEpSet(rsp.pCont, len, &msg); rpcSendResponse(&rsp); + + rpcFreeCont(pMsg->pCont); } static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { @@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { } } -static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { +static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); if (pDnode->status != DND_STAT_RUNNING) { - rpcFreeCont(pReq->pCont); - pReq->pCont = NULL; + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; terrno = TSDB_CODE_NODE_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle); return -1; } else { - rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); + rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); return 0; } } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5fdd2f1842..2a4cf01115 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; - mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); - tmsgSendReq(&epSet, &rpcMsg); - - return 0; + mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle); + return tmsgSendReq(&epSet, &rpcMsg); } static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { - mInfo("app:%p config rsp from dnode", pRsp->info.ahandle); + mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 90adba6f4d..06c3b29132 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* setQueryTimewindow(pReadHandle, pCond); if (pCond->numOfCols > 0) { + int32_t rowLen = 0; + for(int32_t i = 0; i < pCond->numOfCols; ++i) { + rowLen += pCond->colList[i].bytes; + } + // allocate buffer in order to load data blocks from file pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); if (pReadHandle->suppInfo.pstatis == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fc2308c327..87a7d5e3d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -118,11 +118,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg break; } - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5769e09917..fa840e1cd6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,8 +14,6 @@ */ #include "executor.h" -#include -#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 761834f29c..168589148e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -586,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow int32_t numOfRows = pCtx[k].input.numOfRows; int32_t startOffset = pCtx[k].input.startRowIndex; - int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); - pCtx[k].input.startRowIndex = pos; + pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; if (tsCol != NULL) { @@ -758,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt } } } - - // setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); - // uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag; - // if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || - // (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) { - - // SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol; - // if (pCtx[i].columnIndex == -1) { - // for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - // SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); - // if (pColData->info.colId == pCol->colId) { - // pCtx[i].columnIndex = j; - // break; - // } - // } - // } - - // uint32_t status = aAggs[pCtx[i].functionId].status; - // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { - // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); - // In case of the top/bottom query again the nest query result, which has no timestamp column - // don't set the ptsList attribute. - // if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - // pCtx[i].ptsList = (int64_t*) tsInfo->pData; - // } else { - // pCtx[i].ptsList = NULL; - // } - // } - // } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { - // SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; - // SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); - // - // pCtx[i].pInput = p->pData; - // assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); - // for(int32_t j = 0; j < pBlock->info.rows; ++j) { - // char* dst = p->pData + j * p->info.bytes; - // taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); - // } - // } } return code; @@ -875,7 +835,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc taosArrayDestroy(pBlockList); return code; } - + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); @@ -2842,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; - idata.info.type = pSchema[i].type; + + idata.info.type = pSchema[i].type; idata.info.bytes = pSchema[i].bytes; idata.info.colId = pSchema[i].colId; + idata.hasNull = true; taosArrayPush(pBlock->pDataBlock, &idata); if (IS_VAR_DATA_TYPE(idata.info.type)) { @@ -5341,4 +5303,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; return TSDB_CODE_SUCCESS; } - + diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1097e6d84c..f5122a26ef 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan if (upRes) { pInfo->pUpdateRes = upRes; - if (upRes->info.type = STREAM_REPROCESS) { + if (upRes->info.type == STREAM_REPROCESS) { pInfo->updateResIndex = 0; prepareDataScan(pInfo); pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (upRes->info.type = STREAM_INVERT) { + } else if (upRes->info.type == STREAM_INVERT) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return upRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 62732aa1f9..3be131e550 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + pInfo->order = TSDB_ORDER_ASC; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } - // STimeWindow win = {0}; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = NULL; - while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pInfo->invertible) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } + if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); + qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } - pInfo->order = TSDB_ORDER_ASC; + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index e46cb81561..3b1e66f2ad 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) { case FUNCTION_TYPE_SUM: case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_AVG: + case FUNCTION_TYPE_WSTARTTS: + case FUNCTION_TYPE_WENDTS: + case FUNCTION_TYPE_WDURATION: res = true; break; default: diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 6410af2a4b..101595e866 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -64,7 +64,11 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte *(double*)(newInterBuf->buf) = sumSquares; newInterBuf->bufLen = sizeof(double); } - newInterBuf->numOfResult = numOutput; + if (interBuf->numOfResult == 0 && numOutput == 0) { + newInterBuf->numOfResult = 0; + } else { + newInterBuf->numOfResult = 1; + } return 0; } diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 0997e313c8..8e6391eb0b 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -137,7 +137,11 @@ endi sql insert into t1 values(1648791223001,12,14,13,11.1); sleep 500 -sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; +sql select * from streamt; + +print count(*) , count(d) , sum(a) , max(b) , min(c) +print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05 +print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15 if $rows != 4 then print ======$rows diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index 7319fadc80..a00bed30e4 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -354,7 +354,7 @@ class TDTestCase: event.wait() tdLog.info("start consume processor") - pollDelay = 5 + pollDelay = 15 showMsg = 1 showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)