Merge pull request #12726 from taosdata/feature/tq
refactor(stream): exec
This commit is contained in:
commit
dbaa74462e
|
@ -107,6 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
|
|||
if (ref == 0) {
|
||||
taosMemoryFree(pDataSubmit->data);
|
||||
taosMemoryFree(pDataSubmit->dataRef);
|
||||
taosFreeQitem(pDataSubmit);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,6 +280,12 @@ typedef struct {
|
|||
SArray* res; // SArray<SSDataBlock>
|
||||
} SStreamSinkReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SStreamTaskRunReq;
|
||||
|
||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
||||
|
|
|
@ -112,21 +112,20 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
if (pIter == NULL) break;
|
||||
pExec = (STqExec*)pIter;
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||
if (isAdd) {
|
||||
continue;
|
||||
} else {
|
||||
if (!isAdd) {
|
||||
int32_t sz = taosArrayGetSize(tbUidList);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1059,6 +1058,57 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
|
||||
SStreamDataSubmit* pSubmit = NULL;
|
||||
|
||||
// build data
|
||||
pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||
if (pSubmit == NULL) return -1;
|
||||
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pSubmit->dataRef == NULL) goto FAIL;
|
||||
*pSubmit->dataRef = 1;
|
||||
pSubmit->data = data;
|
||||
pSubmit->type = STREAM_INPUT__DATA_BLOCK;
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = (SStreamTask*)pIter;
|
||||
if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) {
|
||||
streamEnqueueDataSubmit(pTask, pSubmit);
|
||||
// TODO cal back pressure
|
||||
}
|
||||
// check run
|
||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||
SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
|
||||
if (pReq == NULL) continue;
|
||||
// TODO: do we need htonl?
|
||||
pReq->head.vgId = pTq->pVnode->config.vgId;
|
||||
pReq->streamId = pTask->streamId;
|
||||
pReq->taskId = pTask->taskId;
|
||||
SRpcMsg msg = {
|
||||
.msgType = 0,
|
||||
.pCont = pReq,
|
||||
.contLen = sizeof(SStreamTaskRunReq),
|
||||
};
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
|
||||
}
|
||||
}
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
|
||||
return 0;
|
||||
FAIL:
|
||||
if (pSubmit) {
|
||||
if (pSubmit->dataRef) {
|
||||
taosMemoryFree(pSubmit->dataRef);
|
||||
}
|
||||
taosFreeQitem(pSubmit);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
|
||||
SStreamTaskExecReq req;
|
||||
tDecodeSStreamTaskExecReq(msg, &req);
|
||||
|
|
|
@ -34,21 +34,11 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
|||
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
||||
pReadHandle->pMsg = pMsg;
|
||||
// pMsg->length = htonl(pMsg->length);
|
||||
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
|
||||
// iterate and convert
|
||||
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
||||
if (pReadHandle->pBlock == NULL) break;
|
||||
|
||||
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
||||
// pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
|
||||
// pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
|
||||
// pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
|
||||
// pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
|
||||
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
|
||||
}
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||
|
|
|
@ -247,6 +247,19 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
void* data = NULL;
|
||||
taosGetQitem(pTask->inputQAll, &data);
|
||||
if (data == NULL) break;
|
||||
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
|
||||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) goto FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
|
@ -298,6 +311,9 @@ int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
|
||||
// dispatch
|
||||
// TODO dispatch guard
|
||||
int8_t outputStatus = atomic_load_8(&pTask->outputStatus);
|
||||
if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) {
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
|
@ -356,6 +372,7 @@ int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -406,11 +423,32 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||
atomic_store_8(&pTask->inputStatus, pRsp->inputStatus);
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
}
|
||||
// continue dispatch
|
||||
streamTaskSink(pTask, pMsgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
streamTaskExec2(pTask, pMsgCb);
|
||||
streamTaskSink(pTask, pMsgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
||||
SArray* pRes = NULL;
|
||||
// source
|
||||
|
|
Loading…
Reference in New Issue