refactor(stream)
This commit is contained in:
parent
1a47dabddb
commit
7391a756f6
|
@ -2589,18 +2589,6 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
|
|||
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t sourceVg;
|
||||
int64_t sourceVer;
|
||||
SArray* data; // SArray<SSDataBlock>
|
||||
} SStreamDispatchReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t inputStatus;
|
||||
} SStreamDispatchRsp;
|
||||
|
||||
#define TD_AUTO_CREATE_TABLE 0x1
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
|
|
|
@ -200,6 +200,10 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
|
|
|
@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
|
|||
if (ref == 0) {
|
||||
taosMemoryFree(pDataSubmit->data);
|
||||
taosMemoryFree(pDataSubmit->dataRef);
|
||||
taosFreeQitem(pDataSubmit);
|
||||
// taosFreeQitem(pDataSubmit);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -286,6 +286,36 @@ typedef struct {
|
|||
int32_t taskId;
|
||||
} SStreamTaskRunReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t sourceTaskId;
|
||||
int32_t sourceVg;
|
||||
#if 0
|
||||
int64_t sourceVer;
|
||||
#endif
|
||||
SArray* data; // SArray<SSDataBlock>
|
||||
} SStreamDispatchReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t inputStatus;
|
||||
} SStreamDispatchRsp;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t sourceTaskId;
|
||||
int32_t sourceVg;
|
||||
} SStreamTaskRecoverReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t inputStatus;
|
||||
} SStreamTaskRecoverRsp;
|
||||
|
||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
||||
|
@ -296,6 +326,12 @@ int32_t streamTaskRun(SStreamTask* pTask);
|
|||
|
||||
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
|
||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -314,6 +314,10 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -121,10 +121,18 @@ int tqCommit(STQ*);
|
|||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
#if 0
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||
#endif
|
||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
// sma
|
||||
int32_t smaOpen(SVnode* pVnode);
|
||||
|
|
|
@ -105,12 +105,11 @@ static void tdSRowDemo() {
|
|||
}
|
||||
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||
void* pIter = NULL;
|
||||
STqExec* pExec = NULL;
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->execs, pIter);
|
||||
if (pIter == NULL) break;
|
||||
pExec = (STqExec*)pIter;
|
||||
STqExec* pExec = (STqExec*)pIter;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||
if (!isAdd) {
|
||||
int32_t sz = taosArrayGetSize(tbUidList);
|
||||
|
@ -275,6 +274,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
}
|
||||
memcpy(data, msg, msgLen);
|
||||
|
||||
tqProcessStreamTriggerNew(pTq, data);
|
||||
|
||||
#if 0
|
||||
SRpcMsg req = {
|
||||
.msgType = TDMT_VND_STREAM_TRIGGER,
|
||||
.pCont = data,
|
||||
|
@ -282,6 +284,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
};
|
||||
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -980,12 +983,24 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||
pTask->status = TASK_STATUS__IDLE;
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
||||
pTask->inputQ = taosOpenQueue();
|
||||
pTask->outputQ = taosOpenQueue();
|
||||
pTask->inputQAll = taosAllocateQall();
|
||||
pTask->outputQAll = taosAllocateQall();
|
||||
|
||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
|
||||
goto FAIL;
|
||||
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
// expand runners
|
||||
pTask->exec.numOfRunners = parallel;
|
||||
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
|
||||
if (pTask->exec.runners == NULL) {
|
||||
return -1;
|
||||
goto FAIL;
|
||||
}
|
||||
for (int32_t i = 0; i < parallel; i++) {
|
||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
|
@ -1007,6 +1022,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
|||
}
|
||||
|
||||
return 0;
|
||||
FAIL:
|
||||
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
||||
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
||||
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
|
||||
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
|
@ -1058,6 +1080,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
|
||||
SStreamDataSubmit* pSubmit = NULL;
|
||||
|
||||
|
@ -1108,6 +1131,7 @@ FAIL:
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
|
||||
SStreamTaskExecReq req;
|
||||
|
@ -1125,25 +1149,28 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) {
|
||||
void* pIter = NULL;
|
||||
bool failed = false;
|
||||
|
||||
SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||
if (pSubmit == NULL) {
|
||||
failed = true;
|
||||
goto SET_TASK_FAIL;
|
||||
}
|
||||
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pSubmit->dataRef == NULL) {
|
||||
failed = true;
|
||||
goto SET_TASK_FAIL;
|
||||
}
|
||||
|
||||
pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK;
|
||||
pSubmit->sourceVer = ver;
|
||||
pSubmit->sourceVg = pTq->pVnode->config.vgId;
|
||||
pSubmit->type = STREAM_INPUT__DATA_SUBMIT;
|
||||
/*pSubmit->sourceVer = ver;*/
|
||||
/*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/
|
||||
pSubmit->data = pReq;
|
||||
*pSubmit->dataRef = 1;
|
||||
|
||||
SET_TASK_FAIL:
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
|
@ -1162,7 +1189,18 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
|
||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||
// TODO dispatch task launch msg to fetch queue
|
||||
SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) continue;
|
||||
// TODO: do we need htonl?
|
||||
pRunReq->head.vgId = pTq->pVnode->config.vgId;
|
||||
pRunReq->streamId = pTask->streamId;
|
||||
pRunReq->taskId = pTask->taskId;
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_TASK_RUN,
|
||||
.pCont = pRunReq,
|
||||
.contLen = sizeof(SStreamTaskRunReq),
|
||||
};
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -1174,11 +1212,53 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
streamDataSubmitRefDec(pSubmit);
|
||||
return 0;
|
||||
} else {
|
||||
if (pSubmit) {
|
||||
if (pSubmit->dataRef) {
|
||||
taosMemoryFree(pSubmit->dataRef);
|
||||
}
|
||||
taosFreeQitem(pSubmit);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
//
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchRsp* pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -106,11 +106,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
|||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
}
|
||||
} break;
|
||||
#if 0
|
||||
case TDMT_VND_TASK_WRITE_EXEC: {
|
||||
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
|
||||
0) < 0) {
|
||||
}
|
||||
} break;
|
||||
#endif
|
||||
case TDMT_VND_ALTER_VNODE:
|
||||
break;
|
||||
default:
|
||||
|
@ -181,11 +183,25 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return vnodeGetTableMeta(pVnode, pMsg);
|
||||
case TDMT_VND_CONSUME:
|
||||
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
||||
|
||||
case TDMT_VND_TASK_RUN:
|
||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TASK_DISPATCH:
|
||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TASK_RECOVER:
|
||||
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TASK_DISPATCH_RSP:
|
||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TASK_RECOVER_RSP:
|
||||
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
||||
|
||||
#if 0
|
||||
case TDMT_VND_TASK_PIPE_EXEC:
|
||||
case TDMT_VND_TASK_MERGE_EXEC:
|
||||
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
||||
case TDMT_VND_STREAM_TRIGGER:
|
||||
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
||||
#endif
|
||||
case TDMT_VND_QUERY_HEARTBEAT:
|
||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
default:
|
||||
|
|
|
@ -68,7 +68,7 @@ static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMs
|
|||
|
||||
// get groupId, compute hash value
|
||||
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
|
||||
//
|
||||
|
||||
// get node
|
||||
// TODO: optimize search process
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
@ -152,13 +152,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
|||
|
||||
// exec
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
taosArrayPush(pRes, output);
|
||||
}
|
||||
|
||||
// destroy
|
||||
|
@ -189,7 +189,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
|
@ -209,7 +209,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
|
@ -231,7 +231,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
|
@ -253,7 +253,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
|
@ -392,12 +392,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
|||
// 1.2 enqueue
|
||||
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
|
||||
pBlock->sourceVg = pReq->sourceVg;
|
||||
pBlock->sourceVer = pReq->sourceVer;
|
||||
/*pBlock->sourceVer = pReq->sourceVer;*/
|
||||
taosWriteQitem(pTask->inputQ, pBlock);
|
||||
|
||||
// 1.3 rsp by input status
|
||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
||||
pCont->inputStatus = status;
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->taskId = pReq->sourceTaskId;
|
||||
pRsp->pCont = pCont;
|
||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
|
@ -439,12 +441,12 @@ int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
|
||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) {
|
||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue