From c409760f75d8de425fe81a6e1f384358dbeae4f9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 21 Jun 2022 11:32:36 +0800 Subject: [PATCH 1/2] feat(stream): drop stream --- include/common/tmsgdef.h | 2 +- include/libs/stream/tstream.h | 21 ++++++----- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 37 +++++++++++++------ source/dnode/snode/src/snode.c | 4 +-- source/dnode/vnode/src/tq/tq.c | 39 ++++++++++++++++++--- source/dnode/vnode/src/tq/tqPush.c | 14 ++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/stream.c | 9 +++-- source/libs/stream/src/streamExec.c | 22 ++++++++---- source/libs/stream/src/streamTask.c | 8 +++-- 13 files changed, 114 insertions(+), 50 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index eeac619105..907798d19d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,7 +187,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_DROP, "vnode-stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) @@ -204,6 +203,7 @@ enum { //shared by snode and vnode TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 937ac2b408..ee599e8498 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -30,9 +30,14 @@ extern "C" { typedef struct SStreamTask SStreamTask; enum { - TASK_STATUS__IDLE = 1, - TASK_STATUS__EXECUTING, - TASK_STATUS__CLOSING, + TASK_STATUS__NORMAL = 0, + TASK_STATUS__DROPPING, +}; + +enum { + TASK_EXEC_STATUS__IDLE = 1, + TASK_EXEC_STATUS__EXECUTING, + TASK_EXEC_STATUS__CLOSING, }; enum { @@ -50,16 +55,12 @@ enum { TASK_OUTPUT_STATUS__BLOCKED, }; -enum { - STREAM_CREATED_BY__USER = 1, - STREAM_CREATED_BY__SMA, -}; - enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__TRIGGER, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__DROP, }; typedef struct { @@ -237,7 +238,9 @@ struct SStreamTask { int64_t streamId; int32_t taskId; int8_t inputType; - int8_t status; + int8_t taskStatus; + + int8_t execStatus; int8_t execType; int8_t sinkType; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 0112feedd2..89349d2de9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -216,7 +216,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 81576e153e..e1ffc3bdb7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -95,7 +95,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c3ed6d781c..c2821669f0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -351,7 +351,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fb92efecf6..7e9069d5fc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -54,9 +54,10 @@ int32_t mndInitStream(SMnode *pMnode) { }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); - mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp); + + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); @@ -477,7 +478,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = pReq; action.contLen = sizeof(SVDropStreamTaskReq); - action.msgType = TDMT_VND_STREAM_TASK_DROP; + action.msgType = TDMT_STREAM_TASK_DROP; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -670,20 +671,24 @@ _OVER: static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - int32_t code = -1; SStreamObj *pStream = NULL; /*SDbObj *pDb = NULL;*/ /*SUserObj *pUser = NULL;*/ - SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont; + SMDropStreamReq dropReq = {0}; + if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { + ASSERT(0); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } pStream = mndAcquireStream(pMnode, dropReq.name); if (pStream == NULL) { if (dropReq.igNotExists) { mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name); - code = 0; - goto DROP_STREAM_OVER; + sdbRelease(pMnode->pSdb, pStream); + return -1; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; @@ -701,14 +706,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); - return code; + sdbRelease(pMnode->pSdb, pStream); + return -1; } mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); - return code; + sdbRelease(pMnode->pSdb, pStream); + return -1; } // drop stream @@ -717,8 +724,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } -DROP_STREAM_OVER: - return code; + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + + return TSDB_CODE_ACTION_IN_PROGRESS; } int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 84a66c680b..c2c4ea3a88 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -92,7 +92,7 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { } tDecoderClear(&decoder); - pTask->status = TASK_STATUS__IDLE; + pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); @@ -205,7 +205,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_DEPLOY: return sndProcessTaskDeployReq(pSnode, pMsg); - case TDMT_VND_STREAM_TASK_DROP: + case TDMT_STREAM_TASK_DROP: return sndProcessTaskDropReq(pSnode, pMsg); default: ASSERT(0); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ece4b7e2a4..a042944776 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -384,7 +384,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { } tDecoderClear(&decoder); - pTask->status = TASK_STATUS__IDLE; + pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); @@ -459,6 +459,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + continue; + } if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; if (!failed) { @@ -487,6 +490,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + return 0; + } streamProcessRunReq(pTask); return 0; } @@ -501,9 +507,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + return 0; + } + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, }; streamProcessDispatchReq(pTask, &req, &rsp); return 0; @@ -513,6 +522,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + return 0; + } streamProcessRecoverReq(pTask, pReq, pMsg); return 0; } @@ -521,6 +533,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + return 0; + } streamProcessDispatchRsp(pTask, pRsp); return 0; } @@ -529,16 +544,32 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + return 0; + } streamProcessRecoverRsp(pTask, pRsp); return 0; } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + // todo + // clear queue + // push drop req into queue + // launch exec to free memory + // remove from hash + return 0; + +#if 0 int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + // set status dropping ASSERT(code == 0); if (code == 0) { // sendrsp } return code; +#endif } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 3af8901b2b..2ec627bd5c 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -45,9 +45,9 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { SMqDataBlkRsp rsp = {0}; // 1. guard and set status executing - int8_t execStatus = - atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); - if (execStatus == TASK_STATUS__IDLE) { + int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, + TASK_EXEC_STATUS__EXECUTING); + if (execStatus == TASK_EXEC_STATUS__IDLE) { SStreamDataSubmit* pSubmit = NULL; // 2. check processedVer // 2.1. if not missed, get msg from queue @@ -68,18 +68,18 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { goto SEND_RSP; } // set exec status closing - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING); + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING); // second run if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { goto SEND_RSP; } // set exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); } SEND_RSP: // 4. if get result // 4.1 set exec input status blocked and exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); + atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); // 4.2 rpc send rsp.rspOffset = pHandle->pushHandle.processedVer; /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ @@ -150,7 +150,7 @@ int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) { continue; } int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus); - if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { + if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { tqSendExecReq(pTq, pHandle); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f3595ebfb0..b5a2da091f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -172,7 +172,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; - case TDMT_VND_STREAM_TASK_DROP: { + case TDMT_STREAM_TASK_DROP: { if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 38a1ad14b1..18a6e5bf77 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -50,6 +50,10 @@ void streamCleanUp() { void streamTriggerByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; + if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + return; + } + if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); if (trigger == NULL) return; @@ -82,8 +86,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { } int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { - int8_t execStatus = atomic_load_8(&pTask->status); - if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { + int8_t execStatus = atomic_load_8(&pTask->execStatus); + if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) return -1; @@ -188,6 +192,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { int32_t streamProcessRunReq(SStreamTask* pTask) { streamExec(pTask, pTask->pMsgCb); + if (pTask->dispatchType != TASK_DISPATCH__NONE) { streamDispatch(pTask, pTask->pMsgCb); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5a71fccab8..a6df73630a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -33,6 +33,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK); SArray* blocks = pBlock->blocks; qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + } else if (pItem->type == STREAM_INPUT__DROP) { + // TODO exec drop + return 0; } // exec @@ -58,6 +61,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { streamTaskExecImpl(pTask, data, pRes); + if (pTask->taskStatus == TASK_STATUS__DROPPING) { + return NULL; + } + if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); if (qRes == NULL) { @@ -94,25 +101,26 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) return -1; while (1) { - int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); - if (execStatus == TASK_STATUS__IDLE) { + int8_t execStatus = + atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING); + if (execStatus == TASK_EXEC_STATUS__IDLE) { // first run pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; // set status closing - atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); + atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING); // second run, make sure inputQ and qall are cleared pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; taosArrayDestroy(pRes); - atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); return 0; - } else if (execStatus == TASK_STATUS__CLOSING) { + } else if (execStatus == TASK_EXEC_STATUS__CLOSING) { continue; - } else if (execStatus == TASK_STATUS__EXECUTING) { + } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) { ASSERT(taosArrayGetSize(pRes) == 0); taosArrayDestroy(pRes); return 0; @@ -122,7 +130,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { } FAIL: if (pRes) taosArrayDestroy(pRes); - atomic_store_8(&pTask->status, TASK_STATUS__IDLE); + atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); return -1; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b5a63d937a..6dfaa4cb74 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { } pTask->taskId = tGenIdPI32(); pTask->streamId = streamId; - pTask->status = TASK_STATUS__IDLE; + pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; @@ -35,7 +35,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; @@ -83,7 +84,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; From 3f6ce2da73157a49ce3be1f054d4471dc8df22ad Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 21 Jun 2022 13:50:33 +0800 Subject: [PATCH 2/2] feat(tmq): support topic with meta --- include/client/taos.h | 10 ++ include/common/tcommon.h | 1 + include/common/tmsg.h | 32 ++++++ source/client/inc/clientInt.h | 62 +++++++----- source/client/src/clientMain.c | 111 +++++++++++---------- source/client/src/tmq.c | 95 +++++++++++++++--- source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/inc/mndDef.h | 4 +- source/dnode/mnode/impl/src/mndDef.c | 3 + source/dnode/mnode/impl/src/mndSubscribe.c | 2 + source/dnode/mnode/impl/src/mndTopic.c | 6 ++ source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 59 +++++++++-- source/dnode/vnode/src/tq/tqRead.c | 19 ++++ source/libs/stream/src/streamExec.c | 13 ++- 15 files changed, 318 insertions(+), 102 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 61538e392a..a0c37c0d78 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -252,6 +252,16 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ +enum tmq_res_t { + TMQ_RES_INVALID = -1, + TMQ_RES_DATA = 1, + TMQ_RES_TABLE_META = 2, +}; + +typedef enum tmq_res_t tmq_res_t; + +DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index a05287761e..60bace3f73 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -34,6 +34,7 @@ enum { enum { TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__POLL_RSP, + TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__END_RSP, }; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 08e5d2ab7d..340cb9893f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1560,6 +1560,7 @@ typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; int8_t subType; + int8_t withMeta; char* sql; char subDbName[TSDB_DB_FNAME_LEN]; union { @@ -2306,6 +2307,7 @@ typedef struct { int64_t newConsumerId; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; + int8_t withMeta; char* qmsg; int64_t suid; } SMqRebVgReq; @@ -2318,6 +2320,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeString(buf, pReq->subKey); tlen += taosEncodeFixedI8(buf, pReq->subType); + tlen += taosEncodeFixedI8(buf, pReq->withMeta); if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { tlen += taosEncodeString(buf, pReq->qmsg); } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { @@ -2333,6 +2336,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeStringTo(buf, pReq->subKey); buf = taosDecodeFixedI8(buf, &pReq->subType); + buf = taosDecodeFixedI8(buf, &pReq->withMeta); if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { buf = taosDecodeString(buf, &pReq->qmsg); } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { @@ -2677,6 +2681,34 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } +typedef struct { + SMqRspHead head; + int64_t reqOffset; + int64_t rspOffset; + int16_t resMsgType; + int32_t metaRspLen; + void* metaRsp; +} SMqMetaRsp; + +static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); + tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); + tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); + buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); + buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); + return (void*)buf; +} + typedef struct { SMqRspHead head; int64_t reqOffset; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c39b492620..cfe507e505 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,9 +20,9 @@ extern "C" { #endif +#include "catalog.h" #include "parser.h" #include "planner.h" -#include "catalog.h" #include "query.h" #include "taos.h" #include "tcommon.h" @@ -51,10 +51,12 @@ extern "C" { enum { RES_TYPE__QUERY = 1, RES_TYPE__TMQ, + RES_TYPE__TMQ_META, }; -#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) +#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) typedef struct SAppInstInfo SAppInstInfo; @@ -66,9 +68,9 @@ typedef struct { int64_t reportBytes; // not implemented int64_t startTime; // ctl - SRWLatch lock; // lock is used in serialization + SRWLatch lock; // lock is used in serialization SAppInstInfo* pAppInstInfo; - SHashObj* activeInfo; // hash + SHashObj* activeInfo; // hash } SAppHbMgr; typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); @@ -76,13 +78,13 @@ typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); typedef struct { - int8_t inited; - int64_t appId; + int8_t inited; + int64_t appId; // ctl int8_t threadStop; TdThread thread; - TdThreadMutex lock; // used when app init and cleanup - SHashObj *appSummary; + TdThreadMutex lock; // used when app init and cleanup + SHashObj* appSummary; SArray* appHbMgrs; // SArray one for each cluster FHbReqHandle reqHandle[CONN_TYPE__MAX]; FHbRspHandle rspHandle[CONN_TYPE__MAX]; @@ -129,7 +131,7 @@ typedef struct STscObj { int8_t connType; int32_t acctId; uint32_t connId; - TAOS *id; // ref ID returned by taosAddRef + TAOS* id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; @@ -188,6 +190,14 @@ typedef struct { SReqResultInfo resInfo; } SMqRspObj; +typedef struct { + int8_t resType; + char topic[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int32_t vgId; + SMqMetaRsp metaRsp; +} SMqMetaRspObj; + typedef struct SRequestObj { int8_t resType; // query or tmq uint64_t requestId; @@ -206,9 +216,9 @@ typedef struct SRequestObj { SRequestSendRecvBody body; bool stableQuery; - bool killed; - uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog - uint32_t retry; + bool killed; + uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog + uint32_t retry; } SRequestObj; typedef struct SSyncQueryParam { @@ -216,15 +226,15 @@ typedef struct SSyncQueryParam { SRequestObj* pRequest; } SSyncQueryParam; -void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); -void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); -void doSetOneRowPtr(SReqResultInfo* pResultInfo); -void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, - bool freeAfterUse); -void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -void doFreeReqResultInfo(SReqResultInfo* pResInfo); +void doSetOneRowPtr(SReqResultInfo* pResultInfo); +void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, + bool freeAfterUse); +void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +void doFreeReqResultInfo(SReqResultInfo* pResInfo); SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen); static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { @@ -289,7 +299,7 @@ bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port, int connType); + uint16_t port, int connType); SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen); @@ -299,7 +309,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -void taos_close_internal(void *taos); +void taos_close_internal(void* taos); // --- heartbeat // global, called by mgmt @@ -320,12 +330,12 @@ void hbMgrInitMqHbRspHandle(); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); -void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta); +void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); -int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function -int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx +int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function +int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx bool qnodeRequired(SRequestObj* pRequest); #ifdef __cplusplus diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c5a237f5ed..4a0d116e74 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -17,6 +17,7 @@ #include "clientInt.h" #include "clientLog.h" #include "clientStmt.h" +#include "functionMgt.h" #include "os.h" #include "query.h" #include "scheduler.h" @@ -25,7 +26,6 @@ #include "tref.h" #include "trpc.h" #include "version.h" -#include "functionMgt.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -97,11 +97,11 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha pass = TSDB_DEFAULT_PASS; } - STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); + STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); if (pObj) { return pObj->id; } - + return NULL; } @@ -111,41 +111,40 @@ void taos_close_internal(void *taos) { } STscObj *pTscObj = (STscObj *)taos; - tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs); + tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id); + taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id); } void taos_close(TAOS *taos) { if (taos == NULL) { return; } - - STscObj* pObj = acquireTscObj(*(int64_t*)taos); + + STscObj *pObj = acquireTscObj(*(int64_t *)taos); if (NULL == pObj) { return; } - + taos_close_internal(pObj); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); taosMemoryFree(taos); } - -int taos_errno(TAOS_RES *tres) { - if (tres == NULL) { +int taos_errno(TAOS_RES *res) { + if (res == NULL || TD_RES_TMQ_META(res)) { return terrno; } - if (TD_RES_TMQ(tres)) { + if (TD_RES_TMQ(res)) { return 0; } - return ((SRequestObj *)tres)->code; + return ((SRequestObj *)res)->code; } const char *taos_errstr(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return (const char *)tstrerror(terrno); } @@ -179,11 +178,15 @@ void taos_free_result(TAOS_RES *res) { if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); pRsp->resInfo.pRspMsg = NULL; doFreeReqResultInfo(&pRsp->resInfo); + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; + taosMemoryFree(pRspObj->metaRsp.metaRsp); + taosMemoryFree(pRspObj); } } int taos_field_count(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return 0; } @@ -194,7 +197,7 @@ int taos_field_count(TAOS_RES *res) { int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); } TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { - if (taos_num_fields(res) == 0) { + if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) { return NULL; } @@ -215,8 +218,8 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + + STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); if (pTscObj == NULL || sql == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -229,21 +232,21 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { taos_query_a(taos, sql, syncQueryFn, param); tsem_wait(¶m->sem); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); return param->pRequest; #else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; } - TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen); + TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); return pRes; #endif @@ -380,7 +383,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) } int *taos_fetch_lengths(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return NULL; } @@ -389,7 +392,7 @@ int *taos_fetch_lengths(TAOS_RES *res) { } TAOS_ROW *taos_result_block(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { terrno = TSDB_CODE_INVALID_PARA; return NULL; } @@ -438,7 +441,7 @@ const char *taos_data_type(int type) { const char *taos_get_client_info() { return version; } int taos_affected_rows(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { return 0; } @@ -448,7 +451,7 @@ int taos_affected_rows(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return TSDB_TIME_PRECISION_MILLI; } @@ -463,15 +466,15 @@ int taos_result_precision(TAOS_RES *res) { } int taos_select_db(TAOS *taos, const char *db) { - STscObj* pObj = acquireTscObj(*(int64_t*)taos); + STscObj *pObj = acquireTscObj(*(int64_t *)taos); if (pObj == NULL) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); terrno = TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED; } if (db == NULL || strlen(db) == 0) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); terrno = TSDB_CODE_TSC_INVALID_INPUT; return terrno; } @@ -483,19 +486,19 @@ int taos_select_db(TAOS *taos, const char *db) { int32_t code = taos_errno(pRequest); taos_free_result(pRequest); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); return code; } void taos_stop_query(TAOS_RES *res) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { return; } SRequestObj *pRequest = (SRequestObj *)res; pRequest->killed = true; - - int32_t numOfFields = taos_num_fields(pRequest); + + int32_t numOfFields = taos_num_fields(pRequest); // It is not a query, no need to stop. if (numOfFields == 0) { tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId); @@ -510,6 +513,9 @@ void taos_stop_query(TAOS_RES *res) { } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { + if (res == NULL || TD_RES_TMQ_META(res)) { + return true; + } SReqResultInfo *pResultInfo = tscGetCurResInfo(res); if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) { return true; @@ -532,7 +538,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return 0; } @@ -575,7 +581,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { } int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return 0; } @@ -615,7 +621,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { - if (res == NULL) { + if (res == NULL || TD_RES_TMQ_META(res)) { return 0; } @@ -636,7 +642,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int taos_validate_sql(TAOS *taos, const char *sql) { return true; } void taos_reset_current_db(TAOS *taos) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return; @@ -644,17 +650,17 @@ void taos_reset_current_db(TAOS *taos) { resetConnectDB(pTscObj); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); } const char *taos_get_server_info(TAOS *taos) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); return pTscObj->ver; } @@ -682,7 +688,7 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { tscDebug("enter meta callback, code %s", tstrerror(code)); - + SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; SQuery *pQuery = pWrapper->pQuery; SRequestObj *pRequest = pWrapper->pRequest; @@ -723,11 +729,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); if (pTscObj == NULL || sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (pTscObj) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(*(int64_t *)taos); } else { terrno = TSDB_CODE_TSC_DISCONNECTED; } @@ -745,7 +751,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param } SRequestObj *pRequest = NULL; - int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); + int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (code != TSDB_CODE_SUCCESS) { terrno = code; fp(param, NULL, terrno); @@ -849,8 +855,8 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, - pRequest->self, code, tstrerror(code), pRequest->requestId); + tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, + tstrerror(code), pRequest->requestId); pResultInfo->pData = pResult; pResultInfo->numOfRows = 0; @@ -884,6 +890,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ASSERT(res != NULL && fp != NULL); + ASSERT(TD_RES_QUERY(res)); SRequestObj *pRequest = res; pRequest->body.fetchFp = fp; @@ -909,6 +916,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ASSERT(res != NULL && fp != NULL); + ASSERT(TD_RES_QUERY(res)); SRequestObj *pRequest = res; pRequest->body.resInfo.convertUcs4 = false; @@ -922,6 +930,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { const void *taos_get_raw_block(TAOS_RES *res) { ASSERT(res != NULL); + ASSERT(TD_RES_QUERY(res)); SRequestObj *pRequest = res; return pRequest->body.resInfo.pData; @@ -948,16 +957,16 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } TAOS_STMT *taos_stmt_init(TAOS *taos) { - STscObj* pObj = acquireTscObj(*(int64_t*)taos); + STscObj *pObj = acquireTscObj(*(int64_t *)taos); if (NULL == pObj) { tscError("invalid parameter for %s", __FUNCTION__); terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - TAOS_STMT* pStmt = stmtInit(pObj); - - releaseTscObj(*(int64_t*)taos); + TAOS_STMT *pStmt = stmtInit(pObj); + + releaseTscObj(*(int64_t *)taos); return pStmt; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 638b4f1ea5..381be92ed0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -149,7 +149,10 @@ typedef struct { int32_t epoch; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; - SMqDataBlkRsp msg; + union { + SMqDataBlkRsp dataRsp; + SMqMetaRsp metaRsp; + }; } SMqPollRspWrapper; typedef struct { @@ -1131,6 +1134,11 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch); } + // handle meta rsp + int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; + if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { + } + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { taosMemoryFree(pMsg->pData); @@ -1138,17 +1146,23 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { goto CREATE_MSG_FAIL; } - pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP; + pRspWrapper->tmqRspType = rspType; pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; - memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); + memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); + + if (rspType == TMQ_MSG_TYPE__POLL_RSP) { + tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); + } else { + ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); + tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); + } - tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); taosMemoryFree(pMsg->pData); tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, - pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); + pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1516,6 +1530,17 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* return pReq; } +SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { + SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); + pRspObj->resType = RES_TYPE__TMQ; + tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); + pRspObj->vgId = pWrapper->vgHandle->vgId; + + memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp)); + return pRspObj; +} + SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; @@ -1523,11 +1548,11 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->msg.withSchema) { + if (!pWrapper->dataRsp.withSchema) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } @@ -1643,12 +1668,12 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - if (pollRspWrapper->msg.head.epoch == consumerEpoch) { + if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = pollRspWrapper->msg.rspOffset; + pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - if (pollRspWrapper->msg.blockNum == 0) { + if (pollRspWrapper->dataRsp.blockNum == 0) { taosFreeQitem(pollRspWrapper); rspWrapper = NULL; continue; @@ -1658,8 +1683,25 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch, - consumerEpoch); + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + taosFreeQitem(pollRspWrapper); + } + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { + SMqClientVg* pVg = pollRspWrapper->vgHandle; + /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ + pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // build rsp + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + taosFreeQitem(pollRspWrapper); + return pRsp; + } else { + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + pollRspWrapper->dataRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } } else { @@ -1747,10 +1789,23 @@ const char* tmq_err2str(int32_t err) { } } +tmq_res_t tmq_get_res_type(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + return TMQ_RES_DATA; + } else if (TD_RES_TMQ_META(res)) { + return TMQ_RES_TABLE_META; + } else { + return TMQ_RES_INVALID; + } +} + const char* tmq_get_topic_name(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->topic, '.') + 1; + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + return strchr(pMetaRspObj->topic, '.') + 1; } else { return NULL; } @@ -1760,6 +1815,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->db, '.') + 1; + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + return strchr(pMetaRspObj->db, '.') + 1; } else { return NULL; } @@ -1769,6 +1827,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return pRspObj->vgId; + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + return pMetaRspObj->vgId; } else { return -1; } @@ -1786,6 +1847,16 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } +int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) { + if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + *raw_meta = pMetaRspObj->metaRsp.metaRsp; + *raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; + return 0; + } + return -1; +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { tmqCommitInner2(tmq, msg, 0, 1, cb, param); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cc987ad6a1..7e9ce34f9f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2956,6 +2956,7 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->subType) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1; if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1; if (TOPIC_SUB_TYPE__DB == pReq->subType) { } else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) { @@ -2985,6 +2986,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1; if (TOPIC_SUB_TYPE__DB == pReq->subType) { } else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 987b01b96a..3e4cb6a64d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -420,7 +420,8 @@ typedef struct { int64_t uid; int64_t dbUid; int32_t version; - int8_t subType; // column, db or stable + int8_t subType; // column, db or stable + int8_t withMeta; // TODO SRWLatch lock; int32_t sqlLen; int32_t astLen; @@ -487,6 +488,7 @@ typedef struct { int64_t dbUid; int32_t vgNum; int8_t subType; + int8_t withMeta; int64_t stbUid; SHashObj* consumerHash; // consumerId -> SMqConsumerEp SArray* unassignedVgs; // SArray diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 20ba71992e..dc7f08ebc2 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -381,6 +381,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->dbUid = pSub->dbUid; pSubNew->stbUid = pSub->stbUid; pSubNew->subType = pSub->subType; + pSubNew->withMeta = pSub->withMeta; pSubNew->vgNum = pSub->vgNum; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -414,6 +415,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { tlen += taosEncodeFixedI64(buf, pSub->dbUid); tlen += taosEncodeFixedI32(buf, pSub->vgNum); tlen += taosEncodeFixedI8(buf, pSub->subType); + tlen += taosEncodeFixedI8(buf, pSub->withMeta); tlen += taosEncodeFixedI64(buf, pSub->stbUid); void *pIter = NULL; @@ -440,6 +442,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { buf = taosDecodeFixedI64(buf, &pSub->dbUid); buf = taosDecodeFixedI32(buf, &pSub->vgNum); buf = taosDecodeFixedI8(buf, &pSub->subType); + buf = taosDecodeFixedI8(buf, &pSub->withMeta); buf = taosDecodeFixedI64(buf, &pSub->stbUid); int32_t sz; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d2b7a61e83..8dde3e92d8 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -96,6 +96,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->dbUid = pTopic->dbUid; pSub->stbUid = pTopic->stbUid; pSub->subType = pTopic->subType; + pSub->withMeta = pTopic->withMeta; ASSERT(pSub->unassignedVgs->size == 0); ASSERT(taosHashGetSize(pSub->consumerHash) == 0); @@ -120,6 +121,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.vgId = pRebVg->pVgEp->vgId; req.qmsg = pRebVg->pVgEp->qmsg; req.subType = pSub->subType; + req.withMeta = pSub->withMeta; req.suid = pSub->stbUid; strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 9632c04f4c..a650ed29f1 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -141,6 +141,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); @@ -208,6 +209,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); @@ -357,6 +359,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.sql = strdup(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.subType = pCreate->subType; + topicObj.withMeta = pCreate->withMeta; + if (topicObj.withMeta) { + ASSERT(topicObj.subType != TOPIC_SUB_TYPE__COLUMN); + } if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { topicObj.ast = strdup(pCreate->ast); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2ee0673ce5..8f6077e996 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -114,6 +114,7 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int64_t consumerId; int32_t epoch; + int8_t fetchMeta; // reader SWalReadHandle* pWalReader; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a042944776..3ce02dc50a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,6 +85,34 @@ void tqClose(STQ* pTq) { } } +int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = pReq->consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqMetaRsp(&abuf, pRsp); + + SRpcMsg resp = { + .info = pMsg->info, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + tmsgSendRsp(&resp); + + tqDebug("vg %d from consumer %ld (epoch %d) send rsp, res msg type %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset); + + return 0; +} + int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) { int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp); void* buf = rpcMallocCont(tlen); @@ -250,8 +278,23 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { /*ASSERT(0);*/ } } else { - // TODO - ASSERT(0); + ASSERT(pHandle->fetchMeta); + ASSERT(pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || + pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || + pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || + pHead->msgType == TDMT_VND_DROP_TTL_TABLE); + // return + SMqMetaRsp metaRsp = {0}; + metaRsp.reqOffset = pReq->currentOffset; + metaRsp.rspOffset = fetchOffset; + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + code = -1; + } + code = 0; + goto OVER; } // TODO batch optimization: @@ -276,7 +319,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) { code = -1; } - +OVER: // TODO wrap in destroy func taosArrayDestroy(rsp.blockDataLen); taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree); @@ -490,7 +533,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { return 0; } streamProcessRunReq(pTask); @@ -507,7 +550,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { return 0; } SRpcMsg rsp = { @@ -522,7 +565,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { return 0; } streamProcessRecoverReq(pTask, pReq, pMsg); @@ -533,7 +576,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { return 0; } streamProcessDispatchRsp(pTask, pRsp); @@ -544,7 +587,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { return 0; } streamProcessRecoverRsp(pTask, pRsp); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0c38d6442b..6f44eb4569 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -42,6 +42,25 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead* code = 0; goto END; } else { + if (pHandle->fetchMeta) { + SWalReadHead* pHead = &((*ppHeadWithCkSum)->head); + if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || + pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE || + pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE || + pHead->msgType == TDMT_VND_DROP_TTL_TABLE) { + code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); + + if (code < 0) { + ASSERT(0); + *fetchOffset = offset; + code = -1; + goto END; + } + *fetchOffset = offset; + code = 0; + goto END; + } + } code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum); if (code < 0) { ASSERT(0); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a6df73630a..bab86223bf 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -82,12 +82,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { return NULL; } - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitRefDec((SStreamDataSubmit*)data); + if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) { + blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); } else { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(data); + if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); + } else { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(data); + } } streamQueueProcessSuccess(pTask->inputQueue); return taosArrayInit(0, sizeof(SSDataBlock));