diff --git a/examples/c/tmq.c b/examples/c/tmq.c index a8584bae82..40d72d3af1 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -176,8 +176,8 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); - /*tmq_list_append(topic_list, "topic_ctb_column");*/ - tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic"); + tmq_list_append(topic_list, "topic_ctb_column"); + /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ return topic_list; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0525cbf367..f7ad7b4ed8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -80,6 +80,37 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + STaosQueue* queue; + STaosQall* qall; + void* qItem; + int8_t failed; +} SStreamQ; + +static FORCE_INLINE void* streamQCurItem(SStreamQ* queue) { + // + return queue->qItem; +} + +static FORCE_INLINE void* streamQNextItem(SStreamQ* queue) { + int8_t failed = atomic_load_8(&queue->failed); + if (failed) { + ASSERT(queue->qItem != NULL); + return streamQCurItem(queue); + } else { + taosGetQitem(queue->qall, &queue->qItem); + if (queue->qItem == NULL) { + taosReadAllQitems(queue->queue, queue->qall); + taosGetQitem(queue->qall, &queue->qItem); + } + return streamQCurItem(queue); + } +} + +static FORCE_INLINE void streamQSetFail(SStreamQ* queue) { atomic_store_8(&queue->failed, 1); } + +static FORCE_INLINE void streamQSetSuccess(SStreamQ* queue) { atomic_store_8(&queue->failed, 0); } + static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pDataSubmit == NULL) return NULL; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 0715ee8066..7cd82b0ac3 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -65,12 +65,6 @@ struct STqReadHandle { // tqPush -typedef struct { - STaosQueue* queue; - STaosQall* qall; - void* qItem; -} STqInputQ; - typedef struct { // msg info int64_t consumerId; @@ -84,10 +78,10 @@ typedef struct { tmr_h timerId; int8_t tmrStopped; // exec - int8_t inputStatus; - int8_t execStatus; - STqInputQ inputQ; - SRWLatch lock; + int8_t inputStatus; + int8_t execStatus; + SStreamQ inputQ; + SRWLatch lock; } STqPushHandle; // tqExec @@ -155,6 +149,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* // tqExec int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); +int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 67651c2f78..e79de255f3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -81,12 +81,41 @@ void tqClose(STQ* pTq) { // TODO } +int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) { + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = pReq->consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqDataBlkRsp(&abuf, pRsp); + + SRpcMsg resp = { + .info = pMsg->info, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + tmsgSendRsp(&resp); + + tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, pRsp->reqOffset, pRsp->rspOffset); + + return 0; +} + int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; int64_t timeout = pReq->timeout; int32_t reqEpoch = pReq->epoch; int64_t fetchOffset; + int32_t code = 0; // get offset to fetch message if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { @@ -155,7 +184,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId); + if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) { + /*ASSERT(0);*/ + } } else { // TODO ASSERT(0); @@ -180,31 +211,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rsp.rspOffset = fetchOffset; - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - return -1; + if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) { + code = -1; } - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, &rsp); - - SRpcMsg resp = { - .info = pMsg->info, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); - - tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", - TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); - // TODO wrap in destroy func taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); @@ -217,7 +227,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); } - return 0; + return code; } int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 2d9207a0de..26e9dfe2e2 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -21,21 +21,74 @@ void tqTmrRspFunc(void* param, void* tmrId) { } int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { + SMqDataBlkRsp rsp = {0}; // 1. guard and set status executing - // 2. check processedVer - // 2.1. if not missed, get msg from queue - // 2.2. if missed, scan wal - // - // 3. exec, after each success, update processed ver - // first run - // set exec status closing - // second run - // set exec status idle - // + int8_t execStatus = + atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); + if (execStatus == TASK_STATUS__IDLE) { + SStreamDataSubmit* pSubmit = NULL; + // 2. check processedVer + // 2.1. if not missed, get msg from queue + // 2.2. if missed, scan wal + pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + while (pHandle->pushHandle.processedVer <= pSubmit->ver) { + // read from wal + } + while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { + streamQSetSuccess(&pHandle->pushHandle.inputQ); + streamDataSubmitRefDec(pSubmit); + pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + if (pSubmit == NULL) break; + } + // 3. exec, after each success, update processed ver + // first run + while (pSubmit != NULL) { + ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); + if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) { + /*ASSERT(0);*/ + } + // update processed + atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); + streamQSetSuccess(&pHandle->pushHandle.inputQ); + streamDataSubmitRefDec(pSubmit); + if (rsp.blockNum > 0) { + goto SEND_RSP; + } else { + pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + } + } + // set exec status closing + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING); + // second run + while (pSubmit != NULL) { + ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); + if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, &rsp, 0) < 0) { + /*ASSERT(0);*/ + } + // update processed + atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); + streamQSetSuccess(&pHandle->pushHandle.inputQ); + streamDataSubmitRefDec(pSubmit); + if (rsp.blockNum > 0) { + goto SEND_RSP; + } else { + pSubmit = streamQNextItem(&pHandle->pushHandle.inputQ); + } + } + // set exec status idle + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); + } +SEND_RSP: // 4. if get result // 4.1 set exec input status blocked and exec status idle + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); // 4.2 rpc send + rsp.rspOffset = pHandle->pushHandle.processedVer; + /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ + /*return -1;*/ + /*}*/ // 4.3 clear rpc info + memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); return 0; }