Merge pull request #10826 from taosdata/feature/tq
add stream exec head
This commit is contained in:
commit
6c8943f17a
|
@ -197,6 +197,11 @@ typedef struct {
|
||||||
};
|
};
|
||||||
} SMsgHead;
|
} SMsgHead;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t workerType;
|
||||||
|
int32_t streamTaskId;
|
||||||
|
} SStreamExecMsgHead;
|
||||||
|
|
||||||
// Submit message for one table
|
// Submit message for one table
|
||||||
typedef struct SSubmitBlk {
|
typedef struct SSubmitBlk {
|
||||||
int64_t uid; // table unique id
|
int64_t uid; // table unique id
|
||||||
|
@ -1891,9 +1896,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t version; // for compatibility(default 0)
|
int8_t version; // for compatibility(default 0)
|
||||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
|
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
|
||||||
int32_t exprLen;
|
int32_t exprLen;
|
||||||
|
@ -1901,7 +1906,7 @@ typedef struct {
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
tb_uid_t tableUid; // super/child/common table uid
|
tb_uid_t tableUid; // super/child/common table uid
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t offset; // use unit by precision of DB
|
int64_t offset; // use unit by precision of DB
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
char* expr; // sma expression
|
char* expr; // sma expression
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
|
@ -2310,7 +2315,7 @@ typedef struct {
|
||||||
} SStreamTaskDeployRsp;
|
} SStreamTaskDeployRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SStreamExecMsgHead head;
|
||||||
// TODO: other info needed by task
|
// TODO: other info needed by task
|
||||||
} SStreamTaskExecReq;
|
} SStreamTaskExecReq;
|
||||||
|
|
||||||
|
|
|
@ -448,6 +448,11 @@ typedef struct {
|
||||||
#define SND_UNIQUE_THREAD_NUM 2
|
#define SND_UNIQUE_THREAD_NUM 2
|
||||||
#define SND_SHARED_THREAD_NUM 2
|
#define SND_SHARED_THREAD_NUM 2
|
||||||
|
|
||||||
|
enum {
|
||||||
|
SND_WORKER_TYPE__SHARED = 1,
|
||||||
|
SND_WORKER_TYPE__UNIQUE,
|
||||||
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -24,12 +24,17 @@ extern "C" {
|
||||||
int32_t dndInitSnode(SDnode *pDnode);
|
int32_t dndInitSnode(SDnode *pDnode);
|
||||||
void dndCleanupSnode(SDnode *pDnode);
|
void dndCleanupSnode(SDnode *pDnode);
|
||||||
|
|
||||||
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
// void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||||
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||||
|
|
||||||
|
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_DND_SNODE_H_*/
|
#endif /*_TD_DND_SNODE_H_*/
|
||||||
|
|
|
@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
||||||
|
SStreamExecMsgHead *pHead = pMsg->pCont;
|
||||||
|
pHead->workerType = htonl(pHead->workerType);
|
||||||
|
return pHead->workerType;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
|
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
pHead->streamTaskId = htonl(pHead->streamTaskId);
|
pHead->streamTaskId = htonl(pHead->streamTaskId);
|
||||||
|
@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
|
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
int32_t workerType = dndGetSWTypeFromMsg(pMsg);
|
||||||
|
if (workerType == SND_WORKER_TYPE__SHARED) {
|
||||||
|
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
|
||||||
|
} else {
|
||||||
|
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
|
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,10 +23,11 @@
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
#include "dndMgmt.h"
|
#include "dndMgmt.h"
|
||||||
#include "dndMnode.h"
|
#include "dndMnode.h"
|
||||||
|
#include "dndSnode.h"
|
||||||
#include "dndVnodes.h"
|
#include "dndVnodes.h"
|
||||||
|
|
||||||
#define INTERNAL_USER "_dnd"
|
#define INTERNAL_USER "_dnd"
|
||||||
#define INTERNAL_CKEY "_key"
|
#define INTERNAL_CKEY "_key"
|
||||||
#define INTERNAL_SECRET "_pwd"
|
#define INTERNAL_SECRET "_pwd"
|
||||||
|
|
||||||
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
|
@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
|
||||||
|
|
||||||
|
// Requests handled by SNODE
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||||
SDnode * pDnode = parent;
|
SDnode *pDnode = parent;
|
||||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||||
|
|
||||||
tmsg_t msgType = pRsp->msgType;
|
tmsg_t msgType = pRsp->msgType;
|
||||||
|
@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
||||||
SDnode * pDnode = param;
|
SDnode *pDnode = param;
|
||||||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||||
|
|
||||||
tmsg_t msgType = pReq->msgType;
|
tmsg_t msgType = pReq->msgType;
|
||||||
|
@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
|
||||||
SAuthReq authReq = {0};
|
SAuthReq authReq = {0};
|
||||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||||
void * pReq = rpcMallocCont(contLen);
|
void *pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
||||||
|
|
Loading…
Reference in New Issue