add stream msg routing

This commit is contained in:
Liu Jicong 2022-03-15 14:56:33 +08:00
parent 6044c75118
commit 2bcc139443
4 changed files with 54 additions and 18 deletions

View File

@ -187,7 +187,10 @@ typedef struct SEp {
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; union {
int32_t vgId;
int32_t streamTaskId;
};
} SMsgHead; } SMsgHead;
// Submit message for one table // Submit message for one table

View File

@ -23,6 +23,9 @@
extern "C" { extern "C" {
#endif #endif
#define SND_UNIQUE_THREAD_NUM 2
#define SND_SHARED_THREAD_NUM 2
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;

View File

@ -166,7 +166,7 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) {
static int32_t dndStartSnodeWorker(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; SSnodeMgmt *pMgmt = &pDnode->smgmt;
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
for (int32_t i = 0; i < 2; i++) { for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
if (pUniqueWorker == NULL) { if (pUniqueWorker == NULL) {
return -1; return -1;
@ -177,8 +177,8 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) {
} }
taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker);
} }
if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
dndProcessSnodeSharedQueue)) { SND_SHARED_THREAD_NUM, dndProcessSnodeSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr()); dError("failed to start snode shared worker since %s", terrstr());
return -1; return -1;
} }
@ -369,13 +369,39 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM;
}
static void dndWriteSnodeMsgToWorkerByMsg(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode); SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) { if (pSnode != NULL) {
int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); int32_t index = dndGetSWIdFromMsg(pMsg);
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); SDnodeWorker *pWorker = taosArrayGetP(pDnode->smgmt.uniqueWorkers, index);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
static void dndWriteSnodeMsgToMgmtWorker(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, 0);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
} }
dndReleaseSnode(pDnode, pSnode); dndReleaseSnode(pDnode, pSnode);
@ -407,9 +433,12 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
} }
} }
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
}
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
// judge from msg to write to unique queue dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
dndWriteSnodeMsgToRandomWorker(pDnode, pMsg);
} }
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {

View File

@ -30,21 +30,26 @@ extern "C" {
#endif #endif
enum { enum {
STREAM_STATUS__READY = 1, STREAM_STATUS__RUNNING = 1,
STREAM_STATUS__STOPPED, STREAM_STATUS__STOPPED,
STREAM_STATUS__CREATING, STREAM_STATUS__CREATING,
STREAM_STATUS__STOPING, STREAM_STATUS__STOPING,
STREAM_STATUS__RESUMING, STREAM_STATUS__RESTORING,
STREAM_STATUS__DELETING, STREAM_STATUS__DELETING,
}; };
enum { enum {
STREAM_RUNNER__RUNNING = 1, STREAM_TASK_STATUS__RUNNING = 1,
STREAM_RUNNER__STOP, STREAM_TASK_STATUS__STOP,
}; };
typedef struct {
SHashObj* pHash; // taskId -> streamTask
} SStreamMeta;
typedef struct SSnode { typedef struct SSnode {
SSnodeOpt cfg; SStreamMeta* pMeta;
SSnodeOpt cfg;
} SSnode; } SSnode;
typedef struct { typedef struct {
@ -62,10 +67,6 @@ typedef struct {
// storage handle // storage handle
} SStreamRunner; } SStreamRunner;
typedef struct {
SHashObj* pHash;
} SStreamMeta;
int32_t sndCreateStream(); int32_t sndCreateStream();
int32_t sndDropStream(); int32_t sndDropStream();