diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 88fa0e728f..3d9228f05c 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -77,7 +77,9 @@ typedef struct SDataBlockInfo { int16_t numOfCols; int16_t hasVarCol; int32_t capacity; - EStreamType type; + // TODO: optimize and remove following + int32_t childId; // used for stream, do not serialize + EStreamType type; // used for stream, do not serialize } SDataBlockInfo; typedef struct SSDataBlock { @@ -105,14 +107,14 @@ typedef struct SColumnInfoData { } SColumnInfoData; typedef struct SQueryTableDataCond { - //STimeWindow twindow; + // STimeWindow twindow; int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; - SColumnInfo *colList; + SColumnInfo* colList; bool loadExternalRows; // load external rows or not int32_t type; // data block load type: int32_t numOfTWindows; - STimeWindow *twindows; + STimeWindow* twindows; } SQueryTableDataCond; void* blockDataDestroy(SSDataBlock* pBlock); @@ -202,17 +204,17 @@ typedef struct SExprInfo { } SExprInfo; typedef struct { - const char* key; - int32_t keyLen; - uint8_t type; - union{ + const char* key; + int32_t keyLen; + uint8_t type; + union { const char* value; int64_t i; uint64_t u; double d; float f; }; - int32_t length; + int32_t length; } SSmlKv; #define QUERY_ASC_FORWARD_STEP 1 @@ -220,7 +222,7 @@ typedef struct { #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -#define SORT_QSORT_T 0x1 +#define SORT_QSORT_T 0x1 #define SORT_SPILLED_MERGE_SORT_T 0x2 typedef struct SSortExecInfo { int32_t sortMethod; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f7ad7b4ed8..bb0b6dc0a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -24,8 +24,8 @@ extern "C" { #endif -#ifndef _TSTREAM_H_ -#define _TSTREAM_H_ +#ifndef _STREAM_H_ +#define _STREAM_H_ typedef struct SStreamTask SStreamTask; @@ -39,6 +39,7 @@ enum { TASK_INPUT_STATUS__NORMAL = 1, TASK_INPUT_STATUS__BLOCKED, TASK_INPUT_STATUS__RECOVER, + TASK_INPUT_STATUS__PROCESSING, TASK_INPUT_STATUS__STOP, TASK_INPUT_STATUS__FAILED, }; @@ -60,6 +61,10 @@ enum { STREAM_INPUT__CHECKPOINT, }; +typedef struct { + int8_t type; +} SStreamQueueItem; + typedef struct { int8_t type; int64_t ver; @@ -80,55 +85,51 @@ typedef struct { int8_t type; } SStreamCheckpoint; +enum { + STREAM_QUEUE__SUCESS = 1, + STREAM_QUEUE__FAILED, + STREAM_QUEUE__PROCESSING, +}; + typedef struct { STaosQueue* queue; STaosQall* qall; void* qItem; - int8_t failed; -} SStreamQ; + int8_t status; +} SStreamQueue; -static FORCE_INLINE void* streamQCurItem(SStreamQ* queue) { - // - return queue->qItem; +SStreamQueue* streamQueueOpen(); +void streamQueueClose(SStreamQueue* queue); + +static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + queue->qItem = NULL; + atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); } -static FORCE_INLINE void* streamQNextItem(SStreamQ* queue) { - int8_t failed = atomic_load_8(&queue->failed); - if (failed) { +static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); +} + +static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } + +static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { + int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); + if (dequeueFlag == STREAM_QUEUE__FAILED) { ASSERT(queue->qItem != NULL); - return streamQCurItem(queue); + return streamQueueCurItem(queue); } else { taosGetQitem(queue->qall, &queue->qItem); if (queue->qItem == NULL) { taosReadAllQitems(queue->queue, queue->qall); taosGetQitem(queue->qall, &queue->qItem); } - return streamQCurItem(queue); + return streamQueueCurItem(queue); } } -static FORCE_INLINE void streamQSetFail(SStreamQ* queue) { atomic_store_8(&queue->failed, 1); } - -static FORCE_INLINE void streamQSetSuccess(SStreamQ* queue) { atomic_store_8(&queue->failed, 0); } - -static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); - if (pDataSubmit == NULL) return NULL; - pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); - if (pDataSubmit->dataRef == NULL) goto FAIL; - pDataSubmit->data = pReq; - *pDataSubmit->dataRef = 1; - pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; - return pDataSubmit; -FAIL: - taosFreeQitem(pDataSubmit); - return NULL; -} - -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { - // - atomic_add_fetch_32(pDataSubmit->dataRef, 1); -} +SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq); static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); @@ -141,9 +142,31 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); +#if 0 int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput); void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); +static FORCE_INLINE int32_t streamEnqueue1(SStreamQueue* queue, SStreamQueueItem* pItem) { + int8_t inputStatus = atomic_load_8(&queue->enqueueStatus); + if (inputStatus == TASK_INPUT_STATUS__NORMAL) { + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); + if (pSubmitClone == NULL) { + atomic_store_8(&queue->enqueueStatus, TASK_INPUT_STATUS__FAILED); + return -1; + } + taosWriteQitem(queue->queue, pSubmitClone); + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { + taosWriteQitem(queue->queue, pItem); + } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + taosWriteQitem(queue->queue, pItem); + } + return 0; + } + return 0; +} +#endif + typedef struct { int8_t parallelizable; char* qmsg; @@ -236,13 +259,17 @@ struct SStreamTask { int8_t dispatchType; int16_t dispatchMsgType; + // node info + int32_t childId; int32_t nodeId; SEpSet epSet; // exec STaskExec exec; - // local sink + // TODO: merge sink and dispatch + + // local sink union { STaskSinkTb tbSink; STaskSinkSma smaSink; @@ -258,25 +285,61 @@ struct SStreamTask { int8_t inputStatus; int8_t outputStatus; - +#if 0 STaosQueue* inputQ; STaosQall* inputQAll; STaosQueue* outputQ; STaosQall* outputQAll; +#endif + + SStreamQueue* inputQueue; + SStreamQueue* outputQueue; // application storage void* ahandle; }; -SStreamTask* tNewSStreamTask(int64_t streamId); +SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); -typedef struct { - // SMsgHead head; - SStreamTask* task; -} SStreamTaskDeployReq; +static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { + while (1) { + int8_t inputStatus = + atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING); + if (inputStatus == TASK_INPUT_STATUS__NORMAL) { + break; + } + ASSERT(0); + } + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); + if (pSubmitClone == NULL) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); + return -1; + } + taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } + + // TODO: back pressure + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); + return 0; +} + +static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); +} + +static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { + taosWriteQitem(pTask->outputQueue->queue, pBlock); + return 0; +} typedef struct { int32_t reserved; @@ -289,6 +352,11 @@ typedef struct { SArray* data; // SArray } SStreamTaskExecReq; +typedef struct { + // SMsgHead head; + SStreamTask* task; +} SStreamTaskDeployReq; + int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq); void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq); void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq); @@ -297,6 +365,12 @@ typedef struct { int32_t reserved; } SStreamTaskExecRsp; +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; +} SStreamTaskRunReq; + typedef struct { // SMsgHead head; int64_t streamId; @@ -304,21 +378,18 @@ typedef struct { SArray* res; // SArray } SStreamSinkReq; -typedef struct { - SMsgHead head; - int64_t streamId; - int32_t taskId; -} SStreamTaskRunReq; - typedef struct { int64_t streamId; int32_t taskId; int32_t sourceTaskId; int32_t sourceVg; + int32_t sourceChildId; #if 0 int64_t sourceVer; #endif - SArray* data; // SArray + int32_t blockNum; + SArray* dataLen; // SArray + SArray* data; // SArray } SStreamDispatchReq; typedef struct { @@ -340,6 +411,8 @@ typedef struct { int8_t inputStatus; } SStreamTaskRecoverRsp; +int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); + int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamDequeueOutput(SStreamTask* pTask, void** output); @@ -356,4 +429,4 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) } #endif -#endif /* ifndef _TSTREAM_H_ */ +#endif /* ifndef _STREAM_H_ */ diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b390a7fe4a..8497a57edd 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -190,7 +190,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -230,7 +230,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -322,7 +322,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); // source part pTask->sourceType = TASK_SOURCE__SCAN; pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; @@ -387,7 +387,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // else, assign to vnode ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); // source part, currently only support multi source pTask->sourceType = TASK_SOURCE__PIPE; @@ -477,7 +477,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewSStreamTask(pStream->uid, 0); // source part pTask->sourceType = TASK_SOURCE__MERGE; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 7cd82b0ac3..e7a744748b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -78,10 +78,10 @@ typedef struct { tmr_h timerId; int8_t tmrStopped; // exec - int8_t inputStatus; - int8_t execStatus; - SStreamQ inputQ; - SRWLatch lock; + int8_t inputStatus; + int8_t execStatus; + SStreamQueue inputQ; + SRWLatch lock; } STqPushHandle; // tqExec @@ -107,7 +107,7 @@ typedef struct { STqExecCol execCol; STqExecTb execTb; STqExecDb execDb; - } exec; + }; } STqExecHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 310b59b2e8..38482ccad5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -266,7 +266,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); } if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.exec.execCol.qmsg = req.qmsg; + pHandle->execHandle.execCol.qmsg = req.qmsg; req.qmsg = NULL; for (int32_t i = 0; i < 5; i++) { SReadHandle handle = { @@ -274,15 +274,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .meta = pTq->pVnode->pMeta, .pMsgCb = &pTq->pVnode->msgCb, }; - pHandle->execHandle.exec.execCol.task[i] = - qCreateStreamExecTaskInfo(pHandle->execHandle.exec.execCol.qmsg, &handle); - ASSERT(pHandle->execHandle.exec.execCol.task[i]); + pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); + ASSERT(pHandle->execHandle.execCol.task[i]); } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->execHandle.exec.execDb.pFilterOutTbUid = + pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->execHandle.exec.execTb.suid = req.suid; + pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList); tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid); @@ -296,17 +295,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { taosArrayDestroy(tbUidList); } taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); + if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { + // TODO + } } else { /*ASSERT(pExec->consumerId == req.oldConsumerId);*/ // TODO handle qmsg and exec modification atomic_store_32(&pHandle->epoch, -1); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); + if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { + // TODO + } } - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - } return 0; } @@ -323,16 +325,13 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { tDecoderClear(&decoder); pTask->status = TASK_STATUS__IDLE; + + pTask->inputQueue = streamQueueOpen(); + pTask->outputQueue = streamQueueOpen(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - pTask->inputQ = taosOpenQueue(); - pTask->outputQ = taosOpenQueue(); - pTask->inputQAll = taosAllocateQall(); - pTask->outputQAll = taosAllocateQall(); - - if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) - goto FAIL; + if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; // exec if (pTask->execType != TASK_EXEC__NONE) { @@ -369,10 +368,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; FAIL: - if (pTask->inputQ) taosCloseQueue(pTask->inputQ); - if (pTask->outputQ) taosCloseQueue(pTask->outputQ); - if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); - if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); + if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); + if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask) taosMemoryFree(pTask); return -1; } @@ -393,38 +390,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { SStreamTask* pTask = (SStreamTask*)pIter; if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; - int8_t inputStatus = atomic_load_8(&pTask->inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - if (failed) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); + if (!failed) { + if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { continue; } - SStreamDataSubmit* pSubmitClone = streamSubmitRefClone(pSubmit); - if (pSubmitClone == NULL) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); + if (streamTriggerByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { continue; } - taosWriteQitem(pTask->inputQ, pSubmitClone); - - int8_t execStatus = atomic_load_8(&pTask->status); - if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { - SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) continue; - // TODO: do we need htonl? - pRunReq->head.vgId = pTq->pVnode->config.vgId; - pRunReq->streamId = pTask->streamId; - pRunReq->taskId = pTask->taskId; - SRpcMsg msg = { - .msgType = TDMT_VND_TASK_RUN, - .pCont = pRunReq, - .contLen = sizeof(SStreamTaskRunReq), - }; - tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg); - } - } else { - // blocked or stopped, do nothing + streamTaskInputFail(pTask); } } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index b8fec34b57..31acc60006 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -59,7 +59,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { - qTaskInfo_t task = pExec->exec.execCol.task[workerId]; + qTaskInfo_t task = pExec->execCol.task[workerId]; ASSERT(task); qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); while (1) { @@ -101,7 +101,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR pRsp->withSchema = 1; STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pReq, 0); - while (tqNextDataBlockFilterOut(pReader, pExec->exec.execDb.pFilterOutTbUid)) { + while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows, &block.info.numOfCols) < 0) { diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 9447c4007b..398a09ecbc 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -22,7 +22,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1; if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (tEncodeCStr(pEncoder, pHandle->execHandle.exec.execCol.qmsg) < 0) return -1; + if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -35,7 +35,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1; if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.exec.execCol.qmsg) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; } tEndDecode(pDecoder); return 0; @@ -88,12 +88,11 @@ int32_t tqMetaOpen(STQ* pTq) { .meta = pTq->pVnode->pMeta, .pMsgCb = &pTq->pVnode->msgCb, }; - handle.execHandle.exec.execCol.task[i] = - qCreateStreamExecTaskInfo(handle.execHandle.exec.execCol.qmsg, &reader); - ASSERT(handle.execHandle.exec.execCol.task[i]); + handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); + ASSERT(handle.execHandle.execCol.task[i]); } } else { - handle.execHandle.exec.execDb.pFilterOutTbUid = + handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index d94c3e387a..9be94eb5b6 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -29,13 +29,13 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm } // update processed atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); - streamQSetSuccess(&pHandle->pushHandle.inputQ); + streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); streamDataSubmitRefDec(pSubmit); if (pRsp->blockNum > 0) { *ppSubmit = pSubmit; return 0; } else { - pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); } } *ppSubmit = pSubmit; @@ -52,14 +52,14 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { // 2. check processedVer // 2.1. if not missed, get msg from queue // 2.2. if missed, scan wal - pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); while (pHandle->pushHandle.processedVer <= pSubmit->ver) { // read from wal } while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { - streamQSetSuccess(&pHandle->pushHandle.inputQ); + streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); streamDataSubmitRefDec(pSubmit); - pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); if (pSubmit == NULL) break; } // 3. exec, after each success, update processed ver diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8909a00c72..43f49b4cc2 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -307,7 +307,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { for (int32_t i = 0; i < 5; i++) { - int32_t code = qUpdateQualifiedTableId(pExec->execHandle.exec.execCol.task[i], tbUidList, isAdd); + int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd); ASSERT(code == 0); } } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { @@ -315,7 +315,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t sz = taosArrayGetSize(tbUidList); for (int32_t i = 0; i < sz; i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - taosHashPut(pExec->execHandle.exec.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); + taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } } else { diff --git a/source/libs/stream/inc/tstreamInc.h b/source/libs/stream/inc/streamInc.h similarity index 74% rename from source/libs/stream/inc/tstreamInc.h rename to source/libs/stream/inc/streamInc.h index c96901e567..604539f16a 100644 --- a/source/libs/stream/inc/tstreamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -13,15 +13,21 @@ * along with this program. If not, see . */ +#ifndef _STREAM_INC_H_ +#define _STREAM_INC_H_ + +#include "executor.h" +#include "tstream.h" + #ifdef __cplusplus extern "C" { #endif -#ifndef _TSTREAM_H_ -#define _TSTREAM_H_ +int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); #ifdef __cplusplus } #endif -#endif /* ifndef _TSTREAM_H_ */ +#endif /* ifndef _STREAM_INC_H_ */ diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c new file mode 100644 index 0000000000..d89f2ed57d --- /dev/null +++ b/source/libs/stream/src/stream.c @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamInc.h" + +int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { + int8_t execStatus = atomic_load_8(&pTask->status); + if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) return -1; + + // TODO: do we need htonl? + pRunReq->head.vgId = vgId; + pRunReq->streamId = pTask->streamId; + pRunReq->taskId = pTask->taskId; + SRpcMsg msg = { + .msgType = TDMT_VND_TASK_RUN, + .pCont = pRunReq, + .contLen = sizeof(SStreamTaskRunReq), + }; + tmsgPutToQueue(pMsgCb, FETCH_QUEUE, &msg); + } + return 0; +} + +#if 1 +int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { + SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + int8_t status; + + // enqueue + if (pBlock != NULL) { + pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; + pBlock->sourceVg = pReq->sourceVg; + pBlock->blocks = pReq->data; + /*pBlock->sourceVer = pReq->sourceVer;*/ + if (streamTaskInput(pTask, (SStreamQueueItem*)pBlock) == 0) { + status = TASK_INPUT_STATUS__NORMAL; + } else { + status = TASK_INPUT_STATUS__FAILED; + } + } else { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + } + + // rsp by input status + SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); + pCont->inputStatus = status; + pCont->streamId = pReq->streamId; + pCont->taskId = pReq->sourceTaskId; + pRsp->pCont = pCont; + pRsp->contLen = sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; +} +#endif + +int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { + // 1. handle input + streamTaskEnqueue(pTask, pReq, pRsp); + + // 2. try exec + // 2.1. idle: exec + // 2.2. executing: return + // 2.3. closing: keep trying + streamExec(pTask, pMsgCb); + + // 3. handle output + // 3.1 check and set status + // 3.2 dispatch / sink + streamSink1(pTask, pMsgCb); + + return 0; +} + +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + // TODO: init recover timer + } + // continue dispatch + streamSink1(pTask, pMsgCb); + return 0; +} + +int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { + streamExec(pTask, pMsgCb); + streamSink1(pTask, pMsgCb); + return 0; +} + +int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { + // + return 0; +} + +int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { + // + return 0; +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c new file mode 100644 index 0000000000..95c0290058 --- /dev/null +++ b/source/libs/stream/src/streamData.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" + +#if 0 +int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pOutput->type); + tlen += taosEncodeFixedI32(buf, pOutput->sourceVg); + tlen += taosEncodeFixedI64(buf, pOutput->sourceVer); + ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK); + tlen += tEncodeDataBlocks(buf, pOutput->blocks); + return tlen; +} + +void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { + buf = taosDecodeFixedI8(buf, &pInput->type); + buf = taosDecodeFixedI32(buf, &pInput->sourceVg); + buf = taosDecodeFixedI64(buf, &pInput->sourceVer); + ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); + buf = tDecodeDataBlocks(buf, &pInput->blocks); + return (void*)buf; +} +#endif + +SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + if (pDataSubmit == NULL) return NULL; + pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); + if (pDataSubmit->dataRef == NULL) goto FAIL; + pDataSubmit->data = pReq; + *pDataSubmit->dataRef = 1; + pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; + return pDataSubmit; +FAIL: + taosFreeQitem(pDataSubmit); + return NULL; +} + +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { + // + atomic_add_fetch_32(pDataSubmit->dataRef, 1); +} + +SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + if (pSubmitClone == NULL) { + return NULL; + } + streamDataSubmitRefInc(pSubmit); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); + return pSubmitClone; +} + +#if 0 +int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->streamId); + tlen += taosEncodeFixedI32(buf, pReq->taskId); + tlen += tEncodeDataBlocks(buf, pReq->data); + return tlen; +} + +void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->streamId); + buf = taosDecodeFixedI32(buf, &pReq->taskId); + buf = tDecodeDataBlocks(buf, &pReq->data); + return (void*)buf; +} + +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); } +#endif diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c new file mode 100644 index 0000000000..4b88cf503e --- /dev/null +++ b/source/libs/stream/src/streamExec.c @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "tstream.h" + +static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { + void* exec = pTask->exec.executor; + + // set input + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); + + qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); + } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { + SStreamDataBlock* pBlock = (SStreamDataBlock*)data; + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + + SArray* blocks = pBlock->blocks; + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + } + + // exec + while (1) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) break; + // TODO: do we need free memory? + SSDataBlock* outputCopy = createOneDataBlock(output, true); + taosArrayPush(pRes, outputCopy); + } + return 0; +} + +static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { + while (1) { + void* data = streamQueueNextItem(pTask->inputQueue); + if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + streamQueueProcessFail(pTask->inputQueue); + taosArrayDestroy(pRes); + return NULL; + } + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + if (streamTaskOutput(pTask, qRes) < 0) { + streamQueueProcessFail(pTask->inputQueue); + taosArrayDestroy(pRes); + taosFreeQitem(qRes); + return NULL; + } + + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); + } else { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(data); + } + streamQueueProcessSuccess(pTask->inputQueue); + return taosArrayInit(0, sizeof(SSDataBlock)); + } + } + return pRes; +} + +// TODO: handle version +int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) return -1; + while (1) { + int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); + if (execStatus == TASK_STATUS__IDLE) { + // first run + pRes = streamExecForQall(pTask, pRes); + if (pRes == NULL) goto FAIL; + + // set status closing + atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); + + // second run, make sure inputQ and qall are cleared + pRes = streamExecForQall(pTask, pRes); + if (pRes == NULL) goto FAIL; + + break; + } else if (execStatus == TASK_STATUS__CLOSING) { + continue; + } else if (execStatus == TASK_STATUS__EXECUTING) { + break; + } else { + ASSERT(0); + } + } + if (pRes) taosArrayDestroy(pRes); + atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + return 0; +FAIL: + if (pRes) taosArrayDestroy(pRes); + atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + return -1; +} + diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c new file mode 100644 index 0000000000..81f18fea8d --- /dev/null +++ b/source/libs/stream/src/streamMsg.c @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); + ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (tEncodeI32(pEncoder, len) < 0) return -1; + if (tEncodeBinary(pEncoder, data, len) < 0) return -1; + } + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; + ASSERT(pReq->blockNum > 0); + pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); + pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len1; + uint64_t len2; + void* data; + if (tDecodeI32(pDecoder, &len1) < 0) return -1; + if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; + ASSERT(len1 == len2); + taosArrayPush(pReq->dataLen, &len1); + taosArrayPush(pReq->data, &data); + } + tEndDecode(pDecoder); + return 0; +} + +int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + SStreamDispatchReq req = { + .streamId = pTask->streamId, + .data = data, + }; + return 0; +} + +static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + SStreamTaskExecReq req = { + .streamId = pTask->streamId, + .data = data, + }; + + int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); + void* buf = rpcMallocCont(tlen); + + if (buf == NULL) { + return -1; + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + ((SMsgHead*)buf)->vgId = 0; + req.taskId = pTask->inplaceDispatcher.taskId; + + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); + *ppEpSet = &pTask->fixedEpDispatcher.epSet; + req.taskId = pTask->fixedEpDispatcher.taskId; + + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO use general name rule of schemaless + char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0}; + // all groupId must be the same in an array + SSDataBlock* pBlock = taosArrayGet(data, 0); + sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); + + // TODO: get hash function by hashMethod + + // get groupId, compute hash value + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + + // get node + // TODO: optimize search process + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(vgInfo); + int32_t nodeId = 0; + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + nodeId = pVgInfo->vgId; + req.taskId = pVgInfo->taskId; + *ppEpSet = &pVgInfo->epSet; + break; + } + } + ASSERT(nodeId != 0); + ((SMsgHead*)buf)->vgId = htonl(nodeId); + } + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSStreamTaskExecReq(&abuf, &req); + + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + pMsg->msgType = pTask->dispatchMsgType; + pMsg->info.noResp = 1; + + return 0; +} + +static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) { + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(data, pIter); + if (pIter == NULL) return 0; + SArray* pData = *(SArray**)pIter; + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet; + if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + tmsgSendReq(pEpSet, &dispatchMsg); + } + return 0; +} diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c new file mode 100644 index 0000000000..f82ef1b42f --- /dev/null +++ b/source/libs/stream/src/streamQueue.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" + +SStreamQueue* streamQueueOpen() { + SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); + if (pQueue == NULL) return NULL; + pQueue->queue = taosOpenQueue(); + pQueue->qall = taosAllocateQall(); + if (pQueue->queue == NULL || pQueue->qall == NULL) { + goto FAIL; + } + pQueue->status = STREAM_QUEUE__SUCESS; + return pQueue; +FAIL: + if (pQueue->queue) taosCloseQueue(pQueue->queue); + if (pQueue->qall) taosFreeQall(pQueue->qall); + taosMemoryFree(pQueue); + return NULL; +} + +void streamQueueClose(SStreamQueue* queue) { + while (1) { + void* qItem = streamQueueNextItem(queue); + if (qItem) + taosFreeQitem(qItem); + else + return; + } +} diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c new file mode 100644 index 0000000000..6acdd1064c --- /dev/null +++ b/source/libs/stream/src/streamSink.c @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "tstream.h" + +int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { + SStreamQueue* queue; + if (pTask->execType == TASK_EXEC__NONE) { + queue = pTask->inputQueue; + } else { + queue = pTask->outputQueue; + } + /*if (streamDequeueBegin(queue) == true) {*/ + /*return -1;*/ + /*}*/ + + if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + while (1) { + SStreamDataBlock* pBlock = streamQueueNextItem(queue); + if (pBlock == NULL) break; + ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); + + // local sink + if (pTask->sinkType == TASK_SINK__TABLE) { + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + } else if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + ASSERT(queue == pTask->outputQueue); + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + ASSERT(queue == pTask->outputQueue); + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + ASSERT(queue == pTask->outputQueue); + } + + streamQueueProcessSuccess(queue); + } + } + + return 0; +} + +#if 0 +int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { + bool firstRun = 1; + while (1) { + SStreamDataBlock* pBlock = NULL; + if (!firstRun) { + taosReadAllQitems(pTask->outputQ, pTask->outputQAll); + } + taosGetQitem(pTask->outputQAll, (void**)&pBlock); + if (pBlock == NULL) { + if (firstRun) { + firstRun = 0; + continue; + } else { + break; + } + } + + SArray* pRes = pBlock->blocks; + + // sink + if (pTask->sinkType == TASK_SINK__TABLE) { + // blockDebugShowData(pRes); + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); + } else if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); + // + } else if (pTask->sinkType == TASK_SINK__FETCH) { + // + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + } + + // dispatch + // TODO dispatch guard + int8_t outputStatus = atomic_load_8(&pTask->outputStatus); + if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) { + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + SRpcMsg dispatchMsg = {0}; + if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { + ASSERT(0); + return -1; + } + + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { + qType = FETCH_QUEUE; + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/ + /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/ + /*qType = MERGE_QUEUE;*/ + /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/ + /*qType = WRITE_QUEUE;*/ + } else { + ASSERT(0); + } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); + + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pShuffleRes == NULL) { + return -1; + } + + int32_t sz = taosArrayGetSize(pRes); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pRes, i); + SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); + if (pArray == NULL) { + pArray = taosArrayInit(0, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; + } + taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); + } + taosArrayPush(pArray, pDataBlock); + } + + if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { + return -1; + } + + } else { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + } + } + } + return 0; +} +#endif diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c new file mode 100644 index 0000000000..890ae1f3b5 --- /dev/null +++ b/source/libs/stream/src/streamTask.c @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "tstream.h" + +SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId) { + SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return NULL; + } + pTask->taskId = tGenIdPI32(); + pTask->streamId = streamId; + pTask->childId = childId; + pTask->status = TASK_STATUS__IDLE; + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + + return pTask; +} + +int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + /*if (tStartEncode(pEncoder) < 0) return -1;*/ + if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; + + if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + + if (pTask->execType != TASK_EXEC__NONE) { + if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->sinkType == TASK_SINK__TABLE) { + if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; + if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; + /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ + } + + /*tEndEncode(pEncoder);*/ + return pEncoder->pos; +} + +int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + /*if (tStartDecode(pDecoder) < 0) return -1;*/ + if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; + + if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + + if (pTask->execType != TASK_EXEC__NONE) { + if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->sinkType == TASK_SINK__TABLE) { + if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; + pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ + if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; + } + + /*tEndDecode(pDecoder);*/ + return 0; +} + +void tFreeSStreamTask(SStreamTask* pTask) { + streamQueueClose(pTask->inputQueue); + streamQueueClose(pTask->outputQueue); + if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); + qDestroyTask(pTask->exec.executor); + taosMemoryFree(pTask); +} diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/streamUpdate.c similarity index 100% rename from source/libs/stream/src/tstreamUpdate.c rename to source/libs/stream/src/streamUpdate.c diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c deleted file mode 100644 index 7d406a7144..0000000000 --- a/source/libs/stream/src/tstream.c +++ /dev/null @@ -1,594 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tstream.h" -#include "executor.h" - -int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) { - int32_t tlen = 0; - tlen += taosEncodeFixedI8(buf, pOutput->type); - tlen += taosEncodeFixedI32(buf, pOutput->sourceVg); - tlen += taosEncodeFixedI64(buf, pOutput->sourceVer); - ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK); - tlen += tEncodeDataBlocks(buf, pOutput->blocks); - return tlen; -} - -void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { - buf = taosDecodeFixedI8(buf, &pInput->type); - buf = taosDecodeFixedI32(buf, &pInput->sourceVg); - buf = taosDecodeFixedI64(buf, &pInput->sourceVer); - ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - buf = tDecodeDataBlocks(buf, &pInput->blocks); - return (void*)buf; -} - -SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { - SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); - if (pSubmitClone == NULL) { - return NULL; - } - streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); - return pSubmitClone; -} - -static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { - SStreamDispatchReq req = { - .streamId = pTask->streamId, - .data = data, - }; - return 0; -} - -static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { - SStreamTaskExecReq req = { - .streamId = pTask->streamId, - .data = data, - }; - - int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); - void* buf = rpcMallocCont(tlen); - - if (buf == NULL) { - return -1; - } - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - ((SMsgHead*)buf)->vgId = 0; - req.taskId = pTask->inplaceDispatcher.taskId; - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); - *ppEpSet = &pTask->fixedEpDispatcher.epSet; - req.taskId = pTask->fixedEpDispatcher.taskId; - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - // TODO use general name rule of schemaless - char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0}; - // all groupId must be the same in an array - SSDataBlock* pBlock = taosArrayGet(data, 0); - sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); - - // TODO: get hash function by hashMethod - - // get groupId, compute hash value - uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); - - // get node - // TODO: optimize search process - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(vgInfo); - int32_t nodeId = 0; - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - nodeId = pVgInfo->vgId; - req.taskId = pVgInfo->taskId; - *ppEpSet = &pVgInfo->epSet; - break; - } - } - ASSERT(nodeId != 0); - ((SMsgHead*)buf)->vgId = htonl(nodeId); - } - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSStreamTaskExecReq(&abuf, &req); - - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - pMsg->msgType = pTask->dispatchMsgType; - pMsg->info.noResp = 1; - - return 0; -} - -static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) { - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(data, pIter); - if (pIter == NULL) return 0; - SArray* pData = *(SArray**)pIter; - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet; - if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - tmsgSendReq(pEpSet, &dispatchMsg); - } - return 0; -} - -int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input) { - ASSERT(pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK); - int8_t inputStatus = atomic_load_8(&pTask->inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - streamDataSubmitRefInc(input); - taosWriteQitem(pTask->inputQ, input); - } - return inputStatus; -} - -int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) { - ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK); - taosWriteQitem(pTask->inputQ, input); - int8_t inputStatus = atomic_load_8(&pTask->inputStatus); - return inputStatus; -} - -static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { - void* exec = pTask->exec.executor; - - // set input - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); - - qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); - } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { - SStreamDataBlock* pBlock = (SStreamDataBlock*)data; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - - SArray* blocks = pBlock->blocks; - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); - } - - // exec - while (1) { - SSDataBlock* output = NULL; - uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) break; - // TODO: do we need free memory? - SSDataBlock* outputCopy = createOneDataBlock(output, true); - taosArrayPush(pRes, outputCopy); - } - - // destroy - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitRefDec((SStreamDataSubmit*)data); - taosFreeQitem(data); - } else { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(data); - } - return 0; -} - -static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { - while (1) { - void* data = NULL; - taosGetQitem(pTask->inputQAll, &data); - if (data == NULL) break; - - streamTaskExecImpl(pTask, data, pRes); - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - taosWriteQitem(pTask->outputQ, qRes); - return taosArrayInit(0, sizeof(SSDataBlock)); - } - } - return pRes; -} - -// TODO: handle version -int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) return -1; - while (1) { - int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); - if (execStatus == TASK_STATUS__IDLE) { - // first run, from qall, handle failure from last exec - pRes = streamExecForQall(pTask, pRes); - if (pRes == NULL) goto FAIL; - - // second run, from inputQ - taosReadAllQitems(pTask->inputQ, pTask->inputQAll); - pRes = streamExecForQall(pTask, pRes); - if (pRes == NULL) goto FAIL; - - // set status closing - atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); - - // third run, make sure inputQ and qall are cleared - taosReadAllQitems(pTask->inputQ, pTask->inputQAll); - pRes = streamExecForQall(pTask, pRes); - if (pRes == NULL) goto FAIL; - - atomic_store_8(&pTask->status, TASK_STATUS__IDLE); - break; - } else if (execStatus == TASK_STATUS__CLOSING) { - continue; - } else if (execStatus == TASK_STATUS__EXECUTING) { - break; - } else { - ASSERT(0); - } - } - return 0; -FAIL: - atomic_store_8(&pTask->status, TASK_STATUS__IDLE); - return -1; -} - -int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { - bool firstRun = 1; - while (1) { - SStreamDataBlock* pBlock = NULL; - if (!firstRun) { - taosReadAllQitems(pTask->outputQ, pTask->outputQAll); - } - taosGetQitem(pTask->outputQAll, (void**)&pBlock); - if (pBlock == NULL) { - if (firstRun) { - firstRun = 0; - continue; - } else { - break; - } - } - - SArray* pRes = pBlock->blocks; - - // sink - if (pTask->sinkType == TASK_SINK__TABLE) { - // blockDebugShowData(pRes); - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); - } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); - // - } else if (pTask->sinkType == TASK_SINK__FETCH) { - // - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - // dispatch - // TODO dispatch guard - int8_t outputStatus = atomic_load_8(&pTask->outputStatus); - if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) { - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SRpcMsg dispatchMsg = {0}; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { - ASSERT(0); - return -1; - } - - int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { - qType = FETCH_QUEUE; - /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/ - /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/ - /*qType = MERGE_QUEUE;*/ - /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/ - /*qType = WRITE_QUEUE;*/ - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (pShuffleRes == NULL) { - return -1; - } - - int32_t sz = taosArrayGetSize(pRes); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRes, i); - SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); - if (pArray == NULL) { - pArray = taosArrayInit(0, sizeof(SSDataBlock)); - if (pArray == NULL) { - return -1; - } - taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); - } - taosArrayPush(pArray, pDataBlock); - } - - if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { - return -1; - } - - } else { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - } - } - } - return 0; -} - -int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - int8_t status; - - // 1.1 update status - // TODO cal backpressure - if (pBlock == NULL) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - status = TASK_INPUT_STATUS__FAILED; - } else { - status = atomic_load_8(&pTask->inputStatus); - } - - // 1.2 enqueue - pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; - pBlock->sourceVg = pReq->sourceVg; - /*pBlock->sourceVer = pReq->sourceVer;*/ - taosWriteQitem(pTask->inputQ, pBlock); - - // 1.3 rsp by input status - SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); - pCont->inputStatus = status; - pCont->streamId = pReq->streamId; - pCont->taskId = pReq->sourceTaskId; - pRsp->pCont = pCont; - pRsp->contLen = sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - - return 0; -} - -int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - // 1. handle input - streamTaskEnqueue(pTask, pReq, pRsp); - - // 2. try exec - // 2.1. idle: exec - // 2.2. executing: return - // 2.3. closing: keep trying - streamExec(pTask, pMsgCb); - - // 3. handle output - // 3.1 check and set status - // 3.2 dispatch / sink - streamSink(pTask, pMsgCb); - - return 0; -} - -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { - atomic_store_8(&pTask->inputStatus, pRsp->inputStatus); - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - // TODO: init recover timer - } - // continue dispatch - streamSink(pTask, pMsgCb); - return 0; -} - -int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { - streamExec(pTask, pMsgCb); - streamSink(pTask, pMsgCb); - return 0; -} - -int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { - // - return 0; -} - -int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { - // - return 0; -} - -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; - tEndEncode(pEncoder); - return 0; -} - -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->streamId); - tlen += taosEncodeFixedI32(buf, pReq->taskId); - tlen += tEncodeDataBlocks(buf, pReq->data); - return tlen; -} - -void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->streamId); - buf = taosDecodeFixedI32(buf, &pReq->taskId); - buf = tDecodeDataBlocks(buf, &pReq->data); - return (void*)buf; -} - -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); } - -SStreamTask* tNewSStreamTask(int64_t streamId) { - SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return NULL; - } - pTask->taskId = tGenIdPI32(); - pTask->streamId = streamId; - pTask->status = TASK_STATUS__IDLE; - - return pTask; -} - -int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - /*if (tStartEncode(pEncoder) < 0) return -1;*/ - if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - - if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; - - if (pTask->execType != TASK_EXEC__NONE) { - if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->sinkType == TASK_SINK__TABLE) { - if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; - if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__FETCH) { - if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; - /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ - } - - /*tEndEncode(pEncoder);*/ - return pEncoder->pos; -} - -int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - /*if (tStartDecode(pDecoder) < 0) return -1;*/ - if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - - if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; - - if (pTask->execType != TASK_EXEC__NONE) { - if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->sinkType == TASK_SINK__TABLE) { - if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; - pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->tbSink.pSchemaWrapper == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__FETCH) { - if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ - if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; - } - - /*tEndDecode(pDecoder);*/ - return 0; -} - -void tFreeSStreamTask(SStreamTask* pTask) { - taosCloseQueue(pTask->inputQ); - taosCloseQueue(pTask->outputQ); - // TODO - if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); - qDestroyTask(pTask->exec.executor); - taosMemoryFree(pTask); -} - -#if 0 -int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) { - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/ - return pEncoder->size; -} -int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) { - return pEncoder->size; -} -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { - taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock); -} -#endif diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 37935087fa..94311bc435 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -34,11 +34,11 @@ typedef struct STaosQnode { } STaosQnode; typedef struct STaosQueue { - STaosQnode * head; - STaosQnode * tail; - STaosQueue * next; // for queue set - STaosQset * qset; // for queue set - void * ahandle; // for queue set + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set FItem itemFp; FItems itemsFp; TdThreadMutex mutex; @@ -47,8 +47,8 @@ typedef struct STaosQueue { } STaosQueue; typedef struct STaosQset { - STaosQueue * head; - STaosQueue * current; + STaosQueue *head; + STaosQueue *current; TdThreadMutex mutex; tsem_t sem; int32_t numOfQueues; @@ -86,7 +86,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) { void taosCloseQueue(STaosQueue *queue) { if (queue == NULL) return; STaosQnode *pTemp; - STaosQset * qset; + STaosQset *qset; taosThreadMutexLock(&queue->mutex); STaosQnode *pNode = queue->head; @@ -282,6 +282,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { *ppItem = pNode->item; num = 1; uTrace("item:%p is fetched", *ppItem); + } else { + *ppItem = NULL; } return num;