diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 01b142e333..b468456cb7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq { char* sql; char* logicalPlan; char* physicalPlan; - SArray* tasks; // SArray + SSubQueryMsg msg; } SMqSetCVgReq; +static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen; + if (buf == NULL) return tlen; + memcpy(*buf, pMsg, tlen); + *buf = POINTER_SHIFT(*buf, tlen); + return tlen; +} + +static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { + int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen; + memcpy(pMsg, buf, tlen); + return POINTER_SHIFT(buf, tlen); +} + static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->vgId); @@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - pReq->tasks = NULL; + buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 02207c4d1b..1925f0e3bd 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -109,12 +109,71 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -const SSchema* tGetTbnameColumnSchema(); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr { + int32_t nodeId; // vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) { + pEpSet->inUse = pAddr->inUse; + pEpSet->numOfEps = pAddr->numOfEps; + for (int j = 0; j < TSDB_MAX_REPLICA; j++) { + pEpSet->port[j] = pAddr->epAddr[j].port; + memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN); + } +} + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + void initQueryModuleMsgHandle(); +const SSchema* tGetTbnameColumnSchema(); +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); + #define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 25e295f980..5afafa08a3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); - -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr { - int32_t nodeId; // vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 772b9bf079..aaedf280b5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer { #endif typedef struct SMqConsumerEp { - int32_t vgId; // -1 for unassigned - SEpSet epset; - int64_t consumerId; // -1 for unassigned - int64_t lastConsumerHbTs; - int64_t lastVgHbTs; + int32_t vgId; // -1 for unassigned + SEpSet epset; + int64_t consumerId; // -1 for unassigned + int64_t lastConsumerHbTs; + int64_t lastVgHbTs; + int32_t execLen; + SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { @@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); return tlen; } @@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); + pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; return buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4cf7505f74..a6634a9f01 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { strcpy(req.sql, pTopic->sql); strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.physicalPlan, pTopic->physicalPlan); + memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { SMqConsumerEp CEp; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - int32_t sz; + //convert phyplan to dag + SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); + SArray *pArray; + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + + } + int32_t sz = taosArrayGetSize(pArray); + //convert dag to msg + for (int32_t i = 0; i < sz; i++) { + STaskInfo* pTaskInfo = taosArrayGet(pArray, i); + int32_t vgId = pTaskInfo->addr.nodeId; + SEpSet epSet; + tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr); + } + /*pTopic->physicalPlan;*/ SVgObj *pVgroup = NULL; SSdb *pSdb = pMnode->pSdb; void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); @@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); } return 0; + qDestroyQueryDag(pDag); } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ead856a06b..cbc948b95e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { // TODO: handle error } - ASSERT(taosArrayGetSize(pArray) == 0); STaskInfo *pInfo = taosArrayGet(pArray, 0); SArray* pTasks; schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); @@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) // TODO: filter out unused column return 0; } + SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); @@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayPush(pArray, &colInfo); return pArray; } -/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t - * status) {*/ -/*return 0;*/ -/*}*/