refactor(stream)

This commit is contained in:
Liu Jicong 2022-05-16 22:43:04 +08:00
parent f77f910264
commit b3fea32a7e
3 changed files with 81 additions and 168 deletions

View File

@ -2494,11 +2494,15 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
} }
typedef struct { typedef struct {
void* data; int64_t streamId;
int32_t taskId;
int32_t sourceVg;
int64_t sourceVer;
SArray* data; // SArray<SSDataBlock>
} SStreamDispatchReq; } SStreamDispatchReq;
typedef struct { typedef struct {
int8_t status; int8_t inputStatus;
} SStreamDispatchRsp; } SStreamDispatchRsp;
#define TD_AUTO_CREATE_TABLE 0x1 #define TD_AUTO_CREATE_TABLE 0x1

View File

@ -1087,35 +1087,6 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
} }
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTaskExecReq req = {0}; //
tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
// enqueue
int32_t inputStatus = streamEnqueueDataBlk(pTask, (SStreamDataBlock*)req.data);
if (inputStatus == TASK_INPUT_STATUS__BLOCKED) {
// TODO rsp blocked
return 0;
}
// try exec
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
if (streamTaskRun(pTask) < 0) {
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
goto FAIL;
}
} else if (execStatus == TASK_STATUS__EXECUTING) {
return 0;
}
// TODO rsp success
return 0; return 0;
FAIL:
return -1;
} }

View File

@ -35,7 +35,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
return (void*)buf; return (void*)buf;
} }
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = { SStreamTaskExecReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
.data = data, .data = data,
@ -107,7 +107,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
SArray* pData = *(SArray**)pIter; SArray* pData = *(SArray**)pIter;
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet; SEpSet* pEpSet;
if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -133,7 +133,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
return inputStatus; return inputStatus;
} }
int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.runners[0].executor; void* exec = pTask->exec.runners[0].executor;
// set input // set input
@ -265,87 +265,42 @@ FAIL:
return -1; return -1;
} }
int32_t streamTaskDispatchDown(SStreamTask* pTask, SMsgCb* pMsgCb) { int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
// bool firstRun = 1;
return 0;
}
int32_t streamTaskSink(SStreamTask* pTask) {
//
return 0;
}
int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* pBlock, SRpcMsg* pRsp) {
// 1. handle input
// 1.1 enqueue
taosWriteQitem(pTask->inputQ, pBlock);
// 1.2 calc back pressure
// 1.3 rsp by input status
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
pCont->status = inputStatus;
pRsp->pCont = pCont;
pRsp->contLen = sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
while (1) { while (1) {
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); SStreamDataBlock* pBlock = NULL;
if (execStatus == TASK_STATUS__IDLE) { if (!firstRun) {
void* exec = pTask->exec.runners[0].executor; taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
SArray* pRes = taosArrayInit(0, sizeof(void*)); }
const SArray* blocks = pBlock->blocks; taosGetQitem(pTask->outputQAll, (void**)&pBlock);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); if (pBlock == NULL) {
while (1) { if (firstRun) {
SSDataBlock* output; firstRun = 0;
uint64_t ts = 0; continue;
if (qExecTask(exec, &output, &ts) < 0) { } else {
ASSERT(false); break;
} }
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
// TODO: wrap destroy block
taosArrayDestroyP(pBlock->blocks, (FDelete)blockDataDestroy);
if (taosArrayGetSize(pRes) != 0) {
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
*resQ = pRes;
taosWriteQitem(pTask->outputQ, resQ);
}
} else if (execStatus == TASK_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_STATUS__EXECUTING)
break;
else {
ASSERT(0);
} }
}
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTask->outputQ, qall);
SArray** ppRes = NULL;
while (1) {
taosGetQitem(qall, (void**)&ppRes);
if (ppRes == NULL) break;
SArray* pRes = *ppRes; SArray* pRes = pBlock->blocks;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->sourceVer, pRes); // blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
} else { } else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
// dispatch // dispatch
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -366,7 +321,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL; SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -401,12 +356,53 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
} }
} }
//
return 0; return 0;
} }
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, char* msg, int32_t msgLen) { int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
// SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status;
// 1.1 update status
// TODO cal backpressure
if (pBlock == NULL) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
status = TASK_INPUT_STATUS__FAILED;
} else {
status = atomic_load_8(&pTask->inputStatus);
}
// 1.2 enqueue
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pBlock->sourceVg = pReq->sourceVg;
pBlock->sourceVer = pReq->sourceVer;
taosWriteQitem(pTask->inputQ, pBlock);
// 1.3 rsp by input status
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
pCont->inputStatus = status;
pRsp->pCont = pCont;
pRsp->contLen = sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return 0;
}
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
// 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamTaskExec2(pTask, pMsgCb);
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
streamTaskSink(pTask, pMsgCb);
return 0; return 0;
} }
@ -415,64 +411,6 @@ int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
return 0; return 0;
} }
int32_t streamTaskRun(SStreamTask* pTask) {
SArray* pRes = NULL;
if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
// TODO remove multi runner
void* exec = pTask->exec.runners[0].executor;
int8_t status = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (status == TASK_STATUS__IDLE) {
pRes = taosArrayInit(0, sizeof(void*));
if (pRes == NULL) {
return -1;
}
void* input = NULL;
taosWriteQitem(pTask->inputQ, &input);
if (input == NULL) return 0;
// TODO: fix type
if (pTask->sourceType == TASK_SOURCE__SCAN) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
while (1) {
SSDataBlock* output;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
streamDataSubmitRefDec(pSubmit);
} else {
SStreamDataBlock* pStreamBlock = (SStreamDataBlock*)input;
const SArray* blocks = pStreamBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
while (1) {
SSDataBlock* output;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
// TODO: wrap destroy block
taosArrayDestroyP(pStreamBlock->blocks, (FDelete)blockDataDestroy);
}
if (taosArrayGetSize(pRes) != 0) {
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
*resQ = pRes;
taosWriteQitem(pTask->outputQ, resQ);
}
}
}
return 0;
}
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) { int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
SArray* pRes = NULL; SArray* pRes = NULL;
// source // source
@ -545,7 +483,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -566,7 +504,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL; SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }