diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 240415b66b..9572157fbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -479,7 +479,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); @@ -487,6 +487,18 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +typedef struct SStreamMeta SStreamMeta; + +SStreamMeta* streamMetaOpen(); +void streamMetaClose(SStreamMeta* streamMeta); + +int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); + +int32_t streamMetaBegin(SStreamMeta* pMeta); +int32_t streamMetaCommit(SStreamMeta* pMeta); +int32_t streamMetaRollBack(SStreamMeta* pMeta); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 9d1142801d..c71e32dae6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -165,7 +165,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp break; case STREAM_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pStreamQ, pMsg); + if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) { + vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg); + } else { + taosWriteQitem(pVnode->pStreamQ, pMsg); + } break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 1f0019ef46..a9c63b0302 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,6 +30,7 @@ extern "C" { #endif +#if 0 typedef struct { SHashObj* pHash; // taskId -> SStreamTask } SStreamMeta; @@ -49,6 +50,7 @@ int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); +#endif #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 352fb51a53..08a60552ba 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -16,7 +16,12 @@ #include "executor.h" #include "sndInt.h" #include "tuuid.h" +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; } +void sndClose(SSnode *pSnode) {} +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; } +#if 0 SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); if (pSnode == NULL) { @@ -151,7 +156,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp); + streamProcessDispatchReq(pTask, &req, &rsp, true); return 0; } @@ -263,3 +268,4 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { } return 0; } +#endif diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 4d95a9d7a5..4f0f77fc2f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -89,7 +89,7 @@ void metaReaderClear(SMetaReader *pReader); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaReadNext(SMetaReader *pReader); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *tagVal); -int metaGetTableNameByUid(void* meta, uint64_t uid, char* tbName); +int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); typedef struct SMetaFltParam { tb_uid_t suid; @@ -183,6 +183,8 @@ bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); +void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); + // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 44b9d1f69c..c093b2cd5d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -115,15 +115,23 @@ typedef struct { } STqHandle; struct STQ { - char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pStreamTasks; // taksId -> SStreamTask - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SVnode* pVnode; + char* path; + SHashObj* pushMgr; // consumerId -> STqHandle* + SHashObj* handles; // subKey -> STqHandle + SHashObj* pStreamTasks; // taksId -> SStreamTask + SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + STqOffsetStore* pOffsetStore; - SVnode* pVnode; - TDB* pMetaStore; - TTB* pExecStore; + + TDB* pMetaStore; + TTB* pExecStore; + + TTB* pAlterInfoStore; + + TDB* pStreamStore; + TTB* pTaskDb; + TTB* pTaskState; }; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 38f16b6231..c9e09cb8d6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -162,7 +162,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3e321fa2a1..b05e226e1d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -280,8 +280,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su return 0; } -static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; } - int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -386,6 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { + ASSERT(fetchOffsetNew.type == TMQ_OFFSET__LOG); int64_t fetchVer = fetchOffsetNew.version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { @@ -461,22 +460,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { goto OVER; } } - - taosMemoryFree(pCkHead); -#if 0 - } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts); - if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { - ASSERT(0); - } - - // 4. send rsp - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; - } -#endif - } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { - ASSERT(0); } OVER: @@ -614,17 +597,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); - } - tDecoderClear(&decoder); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { + int32_t code = 0; ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1); if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); @@ -634,11 +608,15 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); + + if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + code = -1; + goto FAIL; + } + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; - pTask->pMsgCb = &pTq->pVnode->msgCb; // exec @@ -683,15 +661,35 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, pTask->selfChildId); - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); - - /*SMeta* pMeta = pTq->pVnode->pMeta;*/ - /*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/ - - return 0; FAIL: if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); + // TODO free executor + return code; +} + +int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + goto FAIL; + } + tDecoderClear(&decoder); + + if (tqExpandTask(pTq, pTask) < 0) { + goto FAIL; + } + + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + + return 0; + +FAIL: if (pTask) taosMemoryFree(pTask); return -1; } @@ -752,7 +750,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -767,7 +765,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(*ppTask, &req, &rsp); + streamProcessDispatchReq(*ppTask, &req, &rsp, exec); return 0; } else { return -1; @@ -825,16 +823,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { // launch exec to free memory // remove from hash return 0; - -#if 0 - int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - // set status dropping - ASSERT(code == 0); - if (code == 0) { - // sendrsp - } - return code; -#endif } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { @@ -863,3 +851,37 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { // return 0; } + +void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { + STQ* pTq = pVnode->pTq; + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamDispatchReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + goto FAIL; + } + + int32_t taskId = req.taskId; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(*ppTask, &req, &rsp, false); + return; + } +FAIL: + if (pMsg->info.handle == NULL) return; + SRpcMsg rsp = { + .code = code, + .info = pMsg->info, + }; + tmsgSendRsp(&rsp); +} diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c new file mode 100644 index 0000000000..21172134ba --- /dev/null +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "tdbInt.h" +#include "tq.h" + +// STqSnapReader ======================================== +struct STqSnapReader { + STQ* pTq; + int64_t sver; + int64_t ever; + TBC* pCur; +}; + +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { + int32_t code = 0; + STqSnapReader* pReader = NULL; + + // alloc + pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; + + // impl + code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + code = tdbTbcMoveToFirst(pReader->pCur); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + + *ppReader = pReader; + return code; + +_err: + tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tqSnapReaderClose(STqSnapReader** ppReader) { + int32_t code = 0; + + tdbTbcClose((*ppReader)->pCur); + taosMemoryFree(*ppReader); + *ppReader = NULL; + + return code; +} + +int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + const void* pKey = NULL; + const void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + SDecoder decoder; + STqHandle handle; + + *ppData = NULL; + for (;;) { + if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + goto _exit; + } + + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, &handle); + tDecoderClear(&decoder); + + if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { + tdbTbcMoveToNext(pReader->pCur); + break; + } else { + tdbTbcMoveToNext(pReader->pCur); + } + } + + ASSERT(pVal && vLen); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_TQ_HANDLE; + pHdr->size = vLen; + memcpy(pHdr->data, pVal, vLen); + + tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + handle.snapshotVer, handle.subKey, vLen); + +_exit: + return code; + +_err: + tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + return code; +} + +// STqSnapWriter ======================================== +struct STqSnapWriter { + STQ* pTq; + int64_t sver; + int64_t ever; + TXN txn; +}; + +int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { + int32_t code = 0; + STqSnapWriter* pWriter; + + // alloc + pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; + + if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + ASSERT(0); + } + + *ppWriter = pWriter; + return code; + +_err: + tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + STqSnapWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; + + if (rollback) { + ASSERT(0); + } else { + code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + if (code) goto _err; + } + + taosMemoryFree(pWriter); + *ppWriter = NULL; + + // restore from metastore + if (tqMetaRestoreHandle(pTq) < 0) { + goto _err; + } + + return code; + +_err: + tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + return code; +} + +int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + SDecoder decoder = {0}; + SDecoder* pDecoder = &decoder; + STqHandle handle; + + tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tDecodeSTqHandle(pDecoder, &handle); + if (code) goto _err; + code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + if (code < 0) goto _err; + tDecoderClear(pDecoder); + + return code; + +_err: + tDecoderClear(pDecoder); + tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c new file mode 100644 index 0000000000..21172134ba --- /dev/null +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "meta.h" +#include "tdbInt.h" +#include "tq.h" + +// STqSnapReader ======================================== +struct STqSnapReader { + STQ* pTq; + int64_t sver; + int64_t ever; + TBC* pCur; +}; + +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { + int32_t code = 0; + STqSnapReader* pReader = NULL; + + // alloc + pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTq = pTq; + pReader->sver = sver; + pReader->ever = ever; + + // impl + code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + code = tdbTbcMoveToFirst(pReader->pCur); + if (code) { + taosMemoryFree(pReader); + goto _err; + } + + tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + + *ppReader = pReader; + return code; + +_err: + tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppReader = NULL; + return code; +} + +int32_t tqSnapReaderClose(STqSnapReader** ppReader) { + int32_t code = 0; + + tdbTbcClose((*ppReader)->pCur); + taosMemoryFree(*ppReader); + *ppReader = NULL; + + return code; +} + +int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + const void* pKey = NULL; + const void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + SDecoder decoder; + STqHandle handle; + + *ppData = NULL; + for (;;) { + if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + goto _exit; + } + + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSTqHandle(&decoder, &handle); + tDecoderClear(&decoder); + + if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { + tdbTbcMoveToNext(pReader->pCur); + break; + } else { + tdbTbcMoveToNext(pReader->pCur); + } + } + + ASSERT(pVal && vLen); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = SNAP_DATA_TQ_HANDLE; + pHdr->size = vLen; + memcpy(pHdr->data, pVal, vLen); + + tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + handle.snapshotVer, handle.subKey, vLen); + +_exit: + return code; + +_err: + tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + return code; +} + +// STqSnapWriter ======================================== +struct STqSnapWriter { + STQ* pTq; + int64_t sver; + int64_t ever; + TXN txn; +}; + +int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { + int32_t code = 0; + STqSnapWriter* pWriter; + + // alloc + pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pTq = pTq; + pWriter->sver = sver; + pWriter->ever = ever; + + if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + ASSERT(0); + } + + *ppWriter = pWriter; + return code; + +_err: + tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + STqSnapWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; + + if (rollback) { + ASSERT(0); + } else { + code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + if (code) goto _err; + } + + taosMemoryFree(pWriter); + *ppWriter = NULL; + + // restore from metastore + if (tqMetaRestoreHandle(pTq) < 0) { + goto _err; + } + + return code; + +_err: + tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + return code; +} + +int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + SDecoder decoder = {0}; + SDecoder* pDecoder = &decoder; + STqHandle handle; + + tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tDecodeSTqHandle(pDecoder, &handle); + if (code) goto _err; + code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + if (code < 0) goto _err; + tDecoderClear(pDecoder); + + return code; + +_err: + tDecoderClear(pDecoder); + tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a83e1ab85b..4b3d01e873 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -297,8 +297,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); - if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || - pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; @@ -330,7 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: - return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); case TDMT_STREAM_TASK_RECOVER: return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: @@ -485,7 +485,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR rcode = -1; goto _exit; } - + // validate hash sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); if (vnodeValidateTableHash(pVnode, tbName) < 0) { @@ -1021,10 +1021,10 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq taosArrayDestroy(pRes->uidList); SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows}; - int32_t ret = 0; + int32_t ret = 0; tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret); pRsp->pCont = rpcMallocCont(pRsp->contLen); - SEncoder ec = {0}; + SEncoder ec = {0}; tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); tEncodeSVDeleteRsp(&ec, &rsp); tEncoderClear(&ec); diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 3bab354c22..33e864158a 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( stream - PRIVATE os util transport qcom executor + PRIVATE os util transport qcom executor tdb ) if(${BUILD_TEST}) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 31da865a69..9ea1a4786c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -175,40 +175,21 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId); - // 1. handle input streamTaskEnqueue(pTask, pReq, pRsp); - // 2. try exec - // 2.1. idle: exec - // 2.2. executing: return - // 2.3. closing: keep trying -#if 0 - if (pTask->execType != TASK_EXEC__NONE) { -#endif - streamExec(pTask); -#if 0 - } else { - ASSERT(pTask->sinkType != TASK_SINK__NONE); - while (1) { - void* data = streamQueueNextItem(pTask->inputQueue); - if (data == NULL) return 0; - if (streamTaskOutput(pTask, data) < 0) { - ASSERT(0); - } - } - } -#endif + if (exec) { + streamExec(pTask); - // 3. handle output - // 3.1 check and set status - // 3.2 dispatch / sink - if (pTask->dispatchType != TASK_DISPATCH__NONE) { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - streamDispatch(pTask); + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + streamDispatch(pTask); + } + } else { + streamLaunchByWrite(pTask, pTask->nodeId); } return 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c new file mode 100644 index 0000000000..be9dc81c3c --- /dev/null +++ b/source/libs/stream/src/streamMeta.c @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "tdbInt.h" +#include "tstream.h" + +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask); + +typedef struct SStreamMeta { + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pStateDb; + SHashObj* pTasks; + void* ahandle; + TXN txn; + FTaskExpand* expandFunc; +} SStreamMeta; + +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { + SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); + if (pMeta == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pMeta->path = strdup(path); + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { + goto _err; + } + + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) { + goto _err; + } + + pMeta->ahandle = ahandle; + pMeta->expandFunc = expandFunc; + +_err: + + return NULL; +} + +void streamMetaClose(SStreamMeta* pMeta) { + // + return; +} + +int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { + void* buf = NULL; + if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + return -1; + } + taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + + int32_t len; + int32_t code; + tEncodeSize(tEncodeSStreamTask, pTask, len, code); + if (code < 0) { + return -1; + } + buf = taosMemoryCalloc(1, sizeof(len)); + if (buf == NULL) { + return -1; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, len); + tEncodeSStreamTask(&encoder, pTask); + + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) { + ASSERT(0); + return -1; + } + + return 0; +} + +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SStreamTask* pTask = *ppTask; + taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + } + + if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { + /*return -1;*/ + } + return 0; +} + +int32_t streamMetaBegin(SStreamMeta* pMeta) { + if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + + if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamMetaCommit(SStreamMeta* pMeta) { + if (tdbCommit(pMeta->db, &pMeta->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamMetaRollBack(SStreamMeta* pMeta) { + // TODO tdb rollback + return 0; +} +int32_t streamRestoreTask(SStreamMeta* pMeta) { + TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + ASSERT(0); + return -1; + } + + void* pKey = NULL; + int32_t kLen = 0; + void* pVal = NULL; + int32_t vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + tDecodeSStreamTask(&decoder, pTask); + tDecoderClear(&decoder); + } + + return 0; +}