put createStreamExecTaskInfo into right place
This commit is contained in:
parent
7280794c31
commit
7361019e94
|
@ -26,6 +26,13 @@ typedef void* qTaskInfo_t;
|
||||||
typedef void* DataSinkHandle;
|
typedef void* DataSinkHandle;
|
||||||
struct SSubplan;
|
struct SSubplan;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the exec task for streaming mode
|
||||||
|
* @param pMsg
|
||||||
|
* @param pStreamBlockReadHandle
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the exec task object according to task json
|
* Create the exec task object according to task json
|
||||||
|
@ -203,4 +210,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_EXECUTOR_H_*/
|
#endif /*_TD_EXECUTOR_H_*/
|
||||||
|
|
|
@ -328,7 +328,7 @@ typedef struct SMqTopicConsumer {
|
||||||
|
|
||||||
typedef struct SMqConsumerEp {
|
typedef struct SMqConsumerEp {
|
||||||
int32_t vgId; // -1 for unassigned
|
int32_t vgId; // -1 for unassigned
|
||||||
SEpSet epset;
|
SEpSet epSet;
|
||||||
int64_t consumerId; // -1 for unassigned
|
int64_t consumerId; // -1 for unassigned
|
||||||
int64_t lastConsumerHbTs;
|
int64_t lastConsumerHbTs;
|
||||||
int64_t lastVgHbTs;
|
int64_t lastVgHbTs;
|
||||||
|
@ -339,7 +339,7 @@ typedef struct SMqConsumerEp {
|
||||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
|
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||||
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -347,7 +347,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
|
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||||
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
|
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
|
||||||
|
|
|
@ -117,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
// persist msg
|
// persist msg
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = pCEp->epset;
|
action.epSet = pCEp->epSet;
|
||||||
action.pCont = reqStr;
|
action.pCont = reqStr;
|
||||||
action.contLen = tlen;
|
action.contLen = tlen;
|
||||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||||
|
@ -142,36 +142,25 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
|
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
|
||||||
SMqConsumerEp CEp;
|
|
||||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
|
||||||
//convert phyplan to dag
|
//convert phyplan to dag
|
||||||
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
||||||
SArray *pArray;
|
SArray *pArray;
|
||||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t sz = taosArrayGetSize(pArray);
|
int32_t sz = taosArrayGetSize(pArray);
|
||||||
//convert dag to msg
|
//convert dag to msg
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMqConsumerEp CEp;
|
||||||
|
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||||
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
||||||
int32_t vgId = pTaskInfo->addr.nodeId;
|
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||||
SEpSet epSet;
|
CEp.vgId = pTaskInfo->addr.nodeId;
|
||||||
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
|
taosArrayPush(unassignedVg, &CEp);
|
||||||
}
|
}
|
||||||
/*pTopic->physicalPlan;*/
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
|
|
||||||
while (pIter != NULL) {
|
|
||||||
if (pVgroup->dbUid == pTopic->dbUid) {
|
|
||||||
CEp.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
CEp.vgId = pVgroup->vgId;
|
|
||||||
taosArrayPush(unassignedVg, &CEp);
|
|
||||||
}
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
qDestroyQueryDag(pDag);
|
qDestroyQueryDag(pDag);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||||
|
|
|
@ -26,6 +26,7 @@ target_link_libraries(
|
||||||
PUBLIC tfs
|
PUBLIC tfs
|
||||||
PUBLIC wal
|
PUBLIC wal
|
||||||
PUBLIC scheduler
|
PUBLIC scheduler
|
||||||
|
PUBLIC executor
|
||||||
PUBLIC qworker
|
PUBLIC qworker
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
|
#include "executor.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -165,7 +166,7 @@ typedef struct STqTaskItem {
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
void* dst;
|
void* dst;
|
||||||
SSubQueryMsg* pMsg;
|
qTaskInfo_t task;
|
||||||
} STqTaskItem;
|
} STqTaskItem;
|
||||||
|
|
||||||
// new version
|
// new version
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "../../../../../include/libs/executor/executor.h"
|
|
||||||
#include "tqInt.h"
|
#include "tqInt.h"
|
||||||
#include "tqMetaStore.h"
|
#include "tqMetaStore.h"
|
||||||
|
|
||||||
|
@ -635,7 +634,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
}
|
}
|
||||||
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
|
||||||
|
|
||||||
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
|
/*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/
|
||||||
|
|
||||||
// TODO: launch query and get output data
|
// TODO: launch query and get output data
|
||||||
void* outputData;
|
void* outputData;
|
||||||
|
@ -689,8 +688,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
||||||
pTopic->buffer.lastOffset = -1;
|
pTopic->buffer.lastOffset = -1;
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
|
||||||
pTopic->buffer.output[i].pMsg = pMsg;
|
|
||||||
pTopic->buffer.output[i].status = 0;
|
pTopic->buffer.output[i].status = 0;
|
||||||
|
pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL);
|
||||||
}
|
}
|
||||||
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
|
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
|
||||||
// write mq meta
|
// write mq meta
|
||||||
|
@ -763,32 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
taosArrayPush(pArray, &colInfo);
|
taosArrayPush(pArray, &colInfo);
|
||||||
return pArray;
|
return pArray;
|
||||||
}
|
}
|
||||||
|
|
||||||
static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
|
|
||||||
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// print those info into log
|
|
||||||
pMsg->sId = be64toh(pMsg->sId);
|
|
||||||
pMsg->queryId = be64toh(pMsg->queryId);
|
|
||||||
pMsg->taskId = be64toh(pMsg->taskId);
|
|
||||||
pMsg->contentLen = ntohl(pMsg->contentLen);
|
|
||||||
|
|
||||||
struct SSubplan *plan = NULL;
|
|
||||||
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
|
||||||
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
// TODO: destroy SSubplan & pTaskInfo
|
|
||||||
terrno = code;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pTaskInfo;
|
|
||||||
}
|
|
|
@ -11,4 +11,36 @@
|
||||||
*
|
*
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "planner.h"
|
||||||
|
#include "executor.h"
|
||||||
|
|
||||||
|
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
|
||||||
|
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// print those info into log
|
||||||
|
pMsg->sId = be64toh(pMsg->sId);
|
||||||
|
pMsg->queryId = be64toh(pMsg->queryId);
|
||||||
|
pMsg->taskId = be64toh(pMsg->taskId);
|
||||||
|
pMsg->contentLen = ntohl(pMsg->contentLen);
|
||||||
|
|
||||||
|
struct SSubplan *plan = NULL;
|
||||||
|
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
|
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// TODO: destroy SSubplan & pTaskInfo
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pTaskInfo;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue