feat(snode)
This commit is contained in:
parent
9b1ba10fbc
commit
b26cc9c318
|
@ -60,8 +60,8 @@ extern int32_t tsNumOfVnodeSyncThreads;
|
|||
extern int32_t tsNumOfVnodeRsmaThreads;
|
||||
extern int32_t tsNumOfQnodeQueryThreads;
|
||||
extern int32_t tsNumOfQnodeFetchThreads;
|
||||
extern int32_t tsNumOfSnodeSharedThreads;
|
||||
extern int32_t tsNumOfSnodeUniqueThreads;
|
||||
extern int32_t tsNumOfSnodeStreamThreads;
|
||||
extern int32_t tsNumOfSnodeWriteThreads;
|
||||
extern int64_t tsRpcQueueMemoryAllowed;
|
||||
|
||||
// monitor
|
||||
|
|
|
@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
|
|||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
*/
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
||||
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -55,8 +55,8 @@ int32_t tsNumOfVnodeSyncThreads = 2;
|
|||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||
int32_t tsNumOfQnodeFetchThreads = 1;
|
||||
int32_t tsNumOfSnodeSharedThreads = 2;
|
||||
int32_t tsNumOfSnodeUniqueThreads = 2;
|
||||
int32_t tsNumOfSnodeStreamThreads = 4;
|
||||
int32_t tsNumOfSnodeWriteThreads = 1;
|
||||
|
||||
// monitor
|
||||
bool tsEnableMonitor = true;
|
||||
|
@ -133,7 +133,7 @@ int32_t tsDiskCfgNum = 0;
|
|||
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
||||
|
||||
// stream scheduler
|
||||
bool tsSchedStreamToSnode = true;
|
||||
bool tsDeployOnSnode = true;
|
||||
|
||||
/*
|
||||
* minimum scale for whole system, millisecond by default
|
||||
|
@ -390,13 +390,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
|
||||
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 2, 1024, 0) != 0) return -1;
|
||||
tsNumOfSnodeStreamThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfSnodeUniqueThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1;
|
||||
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, 0) != 0) return -1;
|
||||
|
||||
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
@ -542,17 +542,17 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
|||
|
||||
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsNumOfSnodeSharedThreads = numOfCores / 4;
|
||||
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
|
||||
pItem->i32 = tsNumOfSnodeSharedThreads;
|
||||
tsNumOfSnodeStreamThreads = numOfCores / 4;
|
||||
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
|
||||
pItem->i32 = tsNumOfSnodeStreamThreads;
|
||||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
pItem = cfgGetItem(tsCfg, "numOfSnodeUniqueThreads");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsNumOfSnodeUniqueThreads = numOfCores / 4;
|
||||
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
|
||||
pItem->i32 = tsNumOfSnodeUniqueThreads;
|
||||
tsNumOfSnodeWriteThreads = numOfCores / 4;
|
||||
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
|
||||
pItem->i32 = tsNumOfSnodeWriteThreads;
|
||||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
|
@ -696,8 +696,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||
|
||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||
|
@ -946,9 +946,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||
*/
|
||||
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
|
||||
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
|
||||
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||
} else if (strcasecmp("numOfLogLines", name) == 0) {
|
||||
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
|
||||
}
|
||||
|
|
|
@ -30,9 +30,9 @@ typedef struct SSnodeMgmt {
|
|||
SMsgCb msgCb;
|
||||
const char *path;
|
||||
const char *name;
|
||||
int8_t uniqueWorkerInUse;
|
||||
SArray *uniqueWorkers; // SArray<SMultiWorker*>
|
||||
SSingleWorker sharedWorker;
|
||||
int8_t writeWorkerInUse;
|
||||
SArray *writeWroker; // SArray<SMultiWorker*>
|
||||
SSingleWorker streamWorker;
|
||||
} SSnodeMgmt;
|
||||
|
||||
// smHandle.c
|
||||
|
@ -43,10 +43,11 @@ int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
|
|||
// smWorker.c
|
||||
int32_t smStartWorker(SSnodeMgmt *pMgmt);
|
||||
void smStopWorker(SSnodeMgmt *pMgmt);
|
||||
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -69,13 +69,13 @@ SArray *smGetMsgHandles() {
|
|||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -45,6 +45,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
|||
pMgmt->name = pInput->name;
|
||||
pMgmt->msgCb = pInput->msgCb;
|
||||
pMgmt->msgCb.mgmt = pMgmt;
|
||||
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)smPutMsgToQueue;
|
||||
|
||||
SSnodeOpt option = {0};
|
||||
smInitOption(pMgmt, &option);
|
||||
|
|
|
@ -26,18 +26,24 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
|
|||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
|
||||
static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||
SRpcMsg *pMsg = NULL;
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dTrace("msg:%p, get from snode-unique queue", pMsg);
|
||||
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
ASSERT(0);
|
||||
dTrace("msg:%p, get from snode-write queue", pMsg);
|
||||
int32_t code = sndProcessWriteMsg(pMgmt->pSnode, pMsg, NULL);
|
||||
if (code < 0) {
|
||||
dGError("snd, msg:%p failed to process write since %s", pMsg, terrstr(code));
|
||||
if (pMsg->info.handle != NULL) {
|
||||
tmsgSendRsp(pMsg);
|
||||
}
|
||||
} else {
|
||||
smSendRsp(pMsg, 0);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -45,13 +51,15 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
||||
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
dTrace("msg:%p, get from snode-stream queue", pMsg);
|
||||
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
|
||||
if (code < 0) {
|
||||
dGError("snd, msg:%p failed to process stream since %s", pMsg, terrstr(code));
|
||||
smSendRsp(pMsg, terrno);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
|
@ -60,44 +68,44 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
||||
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *));
|
||||
if (pMgmt->uniqueWorkers == NULL) {
|
||||
pMgmt->writeWroker = taosArrayInit(0, sizeof(SMultiWorker *));
|
||||
if (pMgmt->writeWroker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsNumOfSnodeUniqueThreads; i++) {
|
||||
SMultiWorker *pUniqueWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
||||
if (pUniqueWorker == NULL) {
|
||||
for (int32_t i = 0; i < tsNumOfSnodeWriteThreads; i++) {
|
||||
SMultiWorker *pWriteWorker = taosMemoryMalloc(sizeof(SMultiWorker));
|
||||
if (pWriteWorker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMultiWorkerCfg cfg = {
|
||||
.max = 1,
|
||||
.name = "snode-unique",
|
||||
.fp = smProcessUniqueQueue,
|
||||
.name = "snode-write",
|
||||
.fp = smProcessWriteQueue,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
|
||||
if (tMultiWorkerInit(pWriteWorker, &cfg) != 0) {
|
||||
dError("failed to start snode-unique worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
|
||||
if (taosArrayPush(pMgmt->writeWroker, &pWriteWorker) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
SSingleWorkerCfg cfg = {
|
||||
.min = tsNumOfSnodeSharedThreads,
|
||||
.max = tsNumOfSnodeSharedThreads,
|
||||
.name = "snode-shared",
|
||||
.fp = (FItem)smProcessSharedQueue,
|
||||
.min = tsNumOfSnodeStreamThreads,
|
||||
.max = tsNumOfSnodeStreamThreads,
|
||||
.name = "snode-stream",
|
||||
.fp = (FItem)smProcessStreamQueue,
|
||||
.param = pMgmt,
|
||||
};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
|
||||
if (tSingleWorkerInit(&pMgmt->streamWorker, &cfg)) {
|
||||
dError("failed to start snode shared-worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -107,30 +115,50 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void smStopWorker(SSnodeMgmt *pMgmt) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->writeWroker); i++) {
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, i);
|
||||
tMultiWorkerCleanup(pWorker);
|
||||
}
|
||||
taosArrayDestroy(pMgmt->uniqueWorkers);
|
||||
tSingleWorkerCleanup(&pMgmt->sharedWorker);
|
||||
taosArrayDestroy(pMgmt->writeWroker);
|
||||
tSingleWorkerCleanup(&pMgmt->streamWorker);
|
||||
dDebug("snode workers are closed");
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
return pHead->vgId % tsNumOfSnodeUniqueThreads;
|
||||
}
|
||||
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
|
||||
if (pMsg == NULL) {
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
pRpc->pCont = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
||||
/*SMsgHead *pHead = pMsg->pCont;*/
|
||||
/*pHead->workerType = htonl(pHead->workerType);*/
|
||||
/*return pHead->workerType;*/
|
||||
SSnode *pSnode = pMgmt->pSnode;
|
||||
if (pSnode == NULL) {
|
||||
dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
|
||||
TMSG_INFO(pMsg->msgType), qtype);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMsgHead *pHead = pRpc->pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = SNODE_HANDLE;
|
||||
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
|
||||
|
||||
switch (qtype) {
|
||||
case STREAM_QUEUE:
|
||||
smPutNodeMsgToStreamQueue(pMgmt, pMsg);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
smPutNodeMsgToWriteQueue(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
|
@ -141,9 +169,8 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t index = smGetSWIdFromMsg(pMsg);
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
|
||||
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
|
@ -154,19 +181,14 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SSingleWorker *pWorker = &pMgmt->sharedWorker;
|
||||
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SSingleWorker *pWorker = &pMgmt->streamWorker;
|
||||
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t workerType = smGetSWTypeFromMsg(pMsg);
|
||||
if (workerType == SND_WORKER_TYPE__SHARED) {
|
||||
return smPutNodeMsgToSharedQueue(pMgmt, pMsg);
|
||||
if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
|
||||
sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg);
|
||||
} else {
|
||||
return smPutNodeMsgToUniqueQueue(pMgmt, pMsg);
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
#include "tname.h"
|
||||
#include "tuuid.h"
|
||||
|
||||
extern bool tsSchedStreamToSnode;
|
||||
extern bool tsDeployOnSnode;
|
||||
|
||||
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
|
||||
int32_t childId = taosArrayGetSize(pArray);
|
||||
|
@ -190,7 +190,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan,
|
|||
pTask->nodeId = SNODE_HANDLE;
|
||||
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
|
||||
|
||||
plan->execNode.nodeId = 0;
|
||||
plan->execNode.nodeId = SNODE_HANDLE;
|
||||
plan->execNode.epSet = pTask->epSet;
|
||||
|
||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||
|
@ -373,7 +373,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tsSchedStreamToSnode) {
|
||||
if (tsDeployOnSnode) {
|
||||
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
|
|
|
@ -31,6 +31,8 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SSnode {
|
||||
char* path;
|
||||
SStreamMeta* pMeta;
|
||||
SMsgCb msgCb;
|
||||
} SSnode;
|
||||
|
||||
|
|
|
@ -15,197 +15,197 @@
|
|||
|
||||
#include "executor.h"
|
||||
#include "sndInt.h"
|
||||
#include "tstream.h"
|
||||
#include "tuuid.h"
|
||||
/*SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; }*/
|
||||
/*void sndClose(SSnode *pSnode) {}*/
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
|
||||
|
||||
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
|
||||
SStreamDispatchReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
goto FAIL;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
FAIL:
|
||||
if (pMsg->info.handle == NULL) return;
|
||||
SRpcMsg rsp = {
|
||||
.code = code,
|
||||
.info = pMsg->info,
|
||||
};
|
||||
tmsgSendRsp(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
|
||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
pTask->outputQueue = streamQueueOpen();
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
||||
pTask->pMsgCb = &pSnode->msgCb;
|
||||
|
||||
pTask->startVer = ver;
|
||||
|
||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SReadHandle mgHandle = {
|
||||
.vnode = NULL,
|
||||
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
|
||||
.pStateBackend = pTask->pState,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
|
||||
if (pSnode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
pSnode->path = strdup(path);
|
||||
if (pSnode->path == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
}
|
||||
pSnode->msgCb = pOption->msgCb;
|
||||
#if 0
|
||||
pSnode->pMeta = sndMetaNew();
|
||||
|
||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE);
|
||||
if (pSnode->pMeta == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
return pSnode;
|
||||
|
||||
FAIL:
|
||||
taosMemoryFree(pSnode->path);
|
||||
taosMemoryFree(pSnode);
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
return pSnode;
|
||||
}
|
||||
|
||||
void sndClose(SSnode *pSnode) {
|
||||
/*sndMetaDelete(pSnode->pMeta);*/
|
||||
streamMetaClose(pSnode->pMeta);
|
||||
taosMemoryFree(pSnode->path);
|
||||
taosMemoryFree(pSnode);
|
||||
}
|
||||
|
||||
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
|
||||
|
||||
#if 0
|
||||
SStreamMeta *sndMetaNew() {
|
||||
SStreamMeta *pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||
if (pMeta == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pMeta->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
if (pMeta->pHash == NULL) {
|
||||
taosMemoryFree(pMeta);
|
||||
return NULL;
|
||||
}
|
||||
return pMeta;
|
||||
}
|
||||
|
||||
void sndMetaDelete(SStreamMeta *pMeta) {
|
||||
taosHashCleanup(pMeta->pHash);
|
||||
taosMemoryFree(pMeta);
|
||||
}
|
||||
|
||||
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
|
||||
}
|
||||
|
||||
SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) {
|
||||
return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
}
|
||||
|
||||
int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
||||
SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(pTask->exec.qmsg);
|
||||
// TODO:free executor
|
||||
taosMemoryFree(pTask);
|
||||
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
|
||||
// 1.deserialize msg and build task
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
|
||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||
ASSERT(0);
|
||||
code = tDecodeSStreamTask(&decoder, pTask);
|
||||
if (code < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
|
||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
||||
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
pTask->outputQueue = streamQueueOpen();
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
// 2.save task
|
||||
code = streamMetaAddTask(pSnode->pMeta, -1, pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
|
||||
|
||||
pTask->pMsgCb = &pNode->msgCb;
|
||||
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
qInfo("deploy stream: stream id %" PRId64 " task id %d child id %d on snode", pTask->streamId, pTask->taskId,
|
||||
pTask->selfChildId);
|
||||
|
||||
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
|
||||
// 3.go through recover steps to fill history
|
||||
if (pTask->fillHistory) {
|
||||
streamSetParamForRecover(pTask);
|
||||
streamAggRecoverPrepare(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||
return streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
SStreamDispatchReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
|
||||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, true);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, exec);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamTaskRecoverReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->rspTaskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
char *msg = pMsg->pCont;
|
||||
int32_t msgLen = pMsg->contLen;
|
||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||
int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t));
|
||||
ASSERT(code == 0);
|
||||
if (code == 0) {
|
||||
// sendrsp
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -213,53 +213,64 @@ static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) {
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
int32_t taskId = req.dstTaskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
return 0;
|
||||
}
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// stream deploy
|
||||
// stream stop/resume
|
||||
// operator exec
|
||||
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_STREAM_TASK_DEPLOY:
|
||||
return sndProcessTaskDeployReq(pSnode, pMsg);
|
||||
return sndProcessTaskDeployReq(pSnode, pReq, len);
|
||||
case TDMT_STREAM_TASK_DROP:
|
||||
return sndProcessTaskDropReq(pSnode, pMsg);
|
||||
return sndProcessTaskDropReq(pSnode, pReq, len);
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_STREAM_TASK_RUN:
|
||||
return sndProcessTaskRunReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_DISPATCH:
|
||||
return sndProcessTaskDispatchReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_RECOVER:
|
||||
return sndProcessTaskRecoverReq(pSnode, pMsg);
|
||||
return sndProcessTaskDispatchReq(pSnode, pMsg, true);
|
||||
case TDMT_STREAM_RETRIEVE:
|
||||
return sndProcessTaskRecoverReq(pSnode, pMsg);
|
||||
return sndProcessTaskRetrieveReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||
return sndProcessTaskDispatchRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_RECOVER_RSP:
|
||||
return sndProcessTaskRecoverRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
||||
default:
|
||||
|
@ -267,4 +278,3 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1272,7 +1272,6 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
//
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
|
@ -1285,7 +1284,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||
ASSERT(0);
|
||||
char* msgStr = pMsg->pCont;
|
||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1349,7 +1347,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
|
||||
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||
}
|
||||
|
||||
|
|
|
@ -369,7 +369,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
//case TDMT_SCH_CANCEL_TASK:
|
||||
// case TDMT_SCH_CANCEL_TASK:
|
||||
// return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_DROP_TASK:
|
||||
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
|
@ -385,9 +385,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_RUN:
|
||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||
#if 0
|
||||
case TDMT_STREAM_TASK_DISPATCH:
|
||||
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
|
||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||
#endif
|
||||
/*case TDMT_STREAM_TASK_RECOVER:*/
|
||||
/*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
|
||||
case TDMT_STREAM_RETRIEVE:
|
||||
|
|
|
@ -241,7 +241,7 @@
|
|||
./test.sh -f tsim/stream/triggerSession0.sim
|
||||
./test.sh -f tsim/stream/partitionby.sim
|
||||
./test.sh -f tsim/stream/partitionby1.sim
|
||||
# unsupport ./test.sh -f tsim/stream/schedSnode.sim
|
||||
./test.sh -f tsim/stream/schedSnode.sim
|
||||
./test.sh -f tsim/stream/windowClose.sim
|
||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
./test.sh -f tsim/stream/sliding.sim
|
||||
|
|
|
@ -6,10 +6,14 @@ system sh/exec.sh -n dnode1 -s start
|
|||
sleep 50
|
||||
sql connect
|
||||
|
||||
sleep 50
|
||||
|
||||
|
||||
sql create database test vgroups 2;
|
||||
sql create database target vgroups 1;
|
||||
|
||||
sql create snode on dnode 1
|
||||
|
||||
sql create database test vgroups 1;
|
||||
sql create database target vgroups 1;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table ts1 using st tags(1,1,1);
|
||||
|
@ -72,23 +76,23 @@ if $data01 != 8 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
if $data02 != 6 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != 4 then
|
||||
print ======$data03
|
||||
if $data03 != 52 then
|
||||
print ======data03=$data03
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data04 != 52 then
|
||||
print ======$data04
|
||||
print ======data04=$data04
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data05 != 13 then
|
||||
print ======$data05
|
||||
print ======data05=$data05
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
@ -104,17 +108,17 @@ if $data12 != 6 then
|
|||
endi
|
||||
|
||||
if $data13 != 92 then
|
||||
print ======$data13
|
||||
print ======data13=$data13
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 22 then
|
||||
print ======$data14
|
||||
print ======data14=$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != 3 then
|
||||
print ======$data15
|
||||
print ======data15=$data15
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -130,17 +134,17 @@ if $data22 != 4 then
|
|||
endi
|
||||
|
||||
if $data23 != 32 then
|
||||
print ======$data23
|
||||
print ======data23=$data23
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data24 != 12 then
|
||||
print ======$data24
|
||||
print ======data24=$data24
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
print ======data25=$data25
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -156,17 +160,17 @@ if $data32 != 30 then
|
|||
endi
|
||||
|
||||
if $data33 != 180 then
|
||||
print ======$data33
|
||||
print ======data33=$data33
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data34 != 42 then
|
||||
print ======$data34
|
||||
print ======data34=$data34
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
print ======data35=$data35
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue