fix tq
This commit is contained in:
parent
f3aa81399c
commit
a9bd009ba1
File diff suppressed because it is too large
Load Diff
|
@ -43,7 +43,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
|||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic);
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
|
||||
|
||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
|
||||
|
@ -184,7 +184,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
}
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) {
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp* pCEp) {
|
||||
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||
|
@ -199,6 +199,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
|||
req.sql = pTopic->sql;
|
||||
req.logicalPlan = pTopic->logicalPlan;
|
||||
req.physicalPlan = pTopic->physicalPlan;
|
||||
req.qmsg = strdup(pCEp->qmsg);
|
||||
req.qmsgLen = strlen(req.qmsg);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||
void *buf = malloc(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
|
@ -501,17 +503,21 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
}
|
||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||
|
||||
|
||||
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
||||
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
||||
|
||||
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
|
||||
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
|
||||
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
|
||||
// send setmsg to vnode
|
||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
SMqConsumerEp* pCEp = taosArrayGetLast(pSub->assigned);
|
||||
if (pCEp->vgId == vgId) {
|
||||
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
|
||||
// TODO
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// send setmsg to vnode
|
||||
}
|
||||
|
||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||
|
|
|
@ -811,7 +811,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
|
|||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle);
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
|
||||
}
|
||||
taosArrayPush(pConsumer->topics, pTopic);
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -1132,6 +1132,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
|||
}
|
||||
|
||||
int32_t stringToSubplan(const char* str, SSubplan** subplan) {
|
||||
printf("aa: %s\n", str);
|
||||
cJSON* json = cJSON_Parse(str);
|
||||
if (NULL == json) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -1482,13 +1482,14 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
}
|
||||
|
||||
int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
|
||||
msg = calloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
qError("calloc %d failed", msgSize);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
|
||||
SSubQueryMsg* pMsg = calloc(1, msgSize);
|
||||
/*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
|
||||
pMsg->header.vgId = tInfo.addr.nodeId;
|
||||
|
||||
|
@ -1497,7 +1498,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
pMsg->taskId = schGenUUID();
|
||||
pMsg->taskType = TASK_TYPE_PERSISTENT;
|
||||
pMsg->contentLen = msgLen;
|
||||
memcpy(pMsg->msg, msg, msgLen);
|
||||
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
|
||||
|
||||
tInfo.msg = pMsg;
|
||||
|
||||
|
|
|
@ -13,12 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
|
|||
|
||||
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov)
|
||||
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread)
|
||||
|
||||
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp)
|
||||
LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp)
|
||||
ADD_EXECUTABLE(hashTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov)
|
||||
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread)
|
||||
|
||||
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
|
||||
ADD_EXECUTABLE(trefTest ${BIN_SRC})
|
||||
|
|
Loading…
Reference in New Issue