From 548e439aa2b5496c156a51025be5ae64124a844a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 19 Jun 2023 20:48:49 +0800 Subject: [PATCH] add checkpoint --- include/libs/stream/streamSnapshot.h | 33 +++++++++ source/dnode/vnode/src/tq/tqStreamStateSnap.c | 1 + source/libs/executor/src/executor.c | 41 +++++----- source/libs/stream/inc/streamSnapshot.h | 19 +++++ source/libs/stream/src/streamSnapshot.c | 74 +++++++++++++++++++ 5 files changed, 148 insertions(+), 20 deletions(-) create mode 100644 include/libs/stream/streamSnapshot.h create mode 100644 source/libs/stream/inc/streamSnapshot.h create mode 100644 source/libs/stream/src/streamSnapshot.c diff --git a/include/libs/stream/streamSnapshot.h b/include/libs/stream/streamSnapshot.h new file mode 100644 index 0000000000..881ed43d78 --- /dev/null +++ b/include/libs/stream/streamSnapshot.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _STREAM_BACKEDN_SNAPSHOT_H_ +#define _STREAM_BACKEDN_SNAPSHOT_H_ +#include "tcommon.h" + +typedef struct SStreamSnapReader SStreamSnapReader; +typedef struct StreamSnapWriter StreamSnapWriter; + +typedef struct SStreamSnapHandle SStreamSnapHandle; + +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader); +int32_t streamSnapReaderClose(void** ppReader); +int32_t streamSnapRead(void* pReader, uint8_t** ppData); + +// SMetaSnapWriter ======================================== +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter); +int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData); +int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback); + +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index ab7093a701..e150f59aec 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -14,6 +14,7 @@ */ #include "meta.h" +#include "streamSnapshot.h" #include "tdbInt.h" #include "tq.h" diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8b66836d5..76958d086f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,9 +14,9 @@ */ #include "executor.h" -#include -#include #include "executorInt.h" +#include "libs/transport/trpc.h" +#include "libs/wal/wal.h" #include "operator.h" #include "planner.h" #include "querytask.h" @@ -115,7 +115,8 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, + const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); @@ -149,7 +150,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = { .pDataBlock = pDataBlock }; + SPackedData tmp = {.pDataBlock = pDataBlock}; taosArrayPush(pInfo->pBlockLists, &tmp); } @@ -162,7 +163,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) { +void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pStreamScanInfo = pOperator->info; @@ -186,10 +187,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); } -//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { -// SExecTaskInfo* pTaskInfo = tinfo; -// pTaskInfo->code = code; -//} +// void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { +// SExecTaskInfo* pTaskInfo = tinfo; +// pTaskInfo->code = code; +// } int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { @@ -214,7 +215,6 @@ void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) { *ckId = pTaskInfo->streamInfo.checkPointId; } - int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; @@ -333,7 +333,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v } static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr, - SStorageAPI* pAPI) { + SStorageAPI* pAPI) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); int32_t numOfUids = taosArrayGetSize(tableIdList); if (numOfUids == 0) { @@ -344,7 +344,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S uint64_t suid = 0; uint64_t uid = 0; - int32_t type = 0; + int32_t type = 0; tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type); // let's discard the tables those are not created according to the queried super table. @@ -1073,7 +1073,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SOperatorInfo* pOperator = pTaskInfo->pRoot; const char* id = GET_TASKID(pTaskInfo); @@ -1100,7 +1100,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pScanBaseInfo->dataReader = NULL; SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn; - SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); + SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader); walReaderVerifyOffset(pWalReader, pOffset); if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id); @@ -1158,8 +1158,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pScanInfo->scanTimes = 0; if (pScanBaseInfo->dataReader == NULL) { - int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, - pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL); + int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen( + pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock, + (void**)&pScanBaseInfo->dataReader, id, false, NULL); if (code != TSDB_CODE_SUCCESS) { qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); terrno = code; @@ -1217,8 +1218,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); int32_t size = tableListGetSize(pTableListInfo); - pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL, - false, NULL); + pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, + (void**)&pInfo->dataReader, NULL, false, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); @@ -1276,7 +1277,7 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = tinfo; - SArray* plist = getTableListInfo(pTaskInfo); + SArray* plist = getTableListInfo(pTaskInfo); // only extract table in the first elements STableListInfo* pTableListInfo = taosArrayGetP(plist, 0); @@ -1284,7 +1285,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { SArray* pUidList = taosArrayInit(10, sizeof(uint64_t)); int32_t numOfTables = tableListGetSize(pTableListInfo); - for(int32_t i = 0; i < numOfTables; ++i) { + for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); taosArrayPush(pUidList, &pKeyInfo->uid); } diff --git a/source/libs/stream/inc/streamSnapshot.h b/source/libs/stream/inc/streamSnapshot.h new file mode 100644 index 0000000000..3b3859a7e9 --- /dev/null +++ b/source/libs/stream/inc/streamSnapshot.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _STREAM_BACKEDN_SNAPSHOT_H_ +#define _STREAM_BACKEDN_SNAPSHOT_H_ +#include "tcommon.h" + +#endif \ No newline at end of file diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c new file mode 100644 index 0000000000..28d3e099ae --- /dev/null +++ b/source/libs/stream/src/streamSnapshot.c @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamSnapshot.h" +#include "rocksdb/c.h" +#include "tcommon.h" + +struct SStreamSnapHandle { + void* handle; + SArray* fileList; +}; + +struct SStreamSnapReader { + void* pMeta; + int64_t sver; + int64_t ever; +}; + +// SMetaSnapWriter ======================================== +struct StreamSnapWriter { + void* pMeta; + int64_t sver; + int64_t ever; +}; + +void streamSnapHandleInit(SStreamSnapHandle* handle) { + // impl later + handle->fileList = taosArrayInit(32, sizeof(void*)); + return; +} +void streamSnapHandleDestroy(SStreamSnapHandle* handle) { + for (int i = 0; handle && i < taosArrayGetSize(handle->fileList); i++) { + taosMemoryFree(taosArrayGetP(handle->fileList, i)); + } + return; +} + +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader) { + // impl later + rocksdb_t* db = NULL; + + return 0; +} +int32_t streamSnapReaderClose(void** ppReader) { + // impl later + return 0; +} +int32_t streamSnapRead(void* pReader, uint8_t** ppData) { + // impl later + + return 0; +} +// SMetaSnapWriter ======================================== +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter) { + // impl later + return 0; +} +int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData) { + // impl later + return 0; +} +int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback) { return 0; }