add checkpoint

This commit is contained in:
yihaoDeng 2023-06-19 20:48:49 +08:00
parent fd85a8495a
commit 548e439aa2
5 changed files with 148 additions and 20 deletions

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_BACKEDN_SNAPSHOT_H_
#define _STREAM_BACKEDN_SNAPSHOT_H_
#include "tcommon.h"
typedef struct SStreamSnapReader SStreamSnapReader;
typedef struct StreamSnapWriter StreamSnapWriter;
typedef struct SStreamSnapHandle SStreamSnapHandle;
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader);
int32_t streamSnapReaderClose(void** ppReader);
int32_t streamSnapRead(void* pReader, uint8_t** ppData);
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter);
int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback);
#endif

View File

@ -14,6 +14,7 @@
*/
#include "meta.h"
#include "streamSnapshot.h"
#include "tdbInt.h"
#include "tq.h"

View File

@ -14,9 +14,9 @@
*/
#include "executor.h"
#include <libs/transport/trpc.h>
#include <libs/wal/wal.h>
#include "executorInt.h"
#include "libs/transport/trpc.h"
#include "libs/wal/wal.h"
#include "operator.h"
#include "planner.h"
#include "querytask.h"
@ -115,7 +115,8 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot);
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type,
const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
@ -149,7 +150,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedData tmp = { .pDataBlock = pDataBlock };
SPackedData tmp = {.pDataBlock = pDataBlock};
taosArrayPush(pInfo->pBlockLists, &tmp);
}
@ -162,7 +163,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pStreamScanInfo = pOperator->info;
@ -186,10 +187,10 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
}
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
// void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
// }
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
@ -214,7 +215,6 @@ void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
*ckId = pTaskInfo->streamInfo.checkPointId;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
@ -333,7 +333,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
}
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
SStorageAPI* pAPI) {
SStorageAPI* pAPI) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
int32_t numOfUids = taosArrayGetSize(tableIdList);
if (numOfUids == 0) {
@ -344,7 +344,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
uint64_t suid = 0;
uint64_t uid = 0;
int32_t type = 0;
int32_t type = 0;
tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
// let's discard the tables those are not created according to the queried super table.
@ -1073,7 +1073,7 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
const char* id = GET_TASKID(pTaskInfo);
@ -1100,7 +1100,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanBaseInfo->dataReader = NULL;
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
@ -1158,8 +1158,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL);
int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(
pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, pScanInfo->pResBlock,
(void**)&pScanBaseInfo->dataReader, id, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code;
@ -1217,8 +1218,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
false, NULL);
pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL,
(void**)&pInfo->dataReader, NULL, false, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
@ -1276,7 +1277,7 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = tinfo;
SArray* plist = getTableListInfo(pTaskInfo);
SArray* plist = getTableListInfo(pTaskInfo);
// only extract table in the first elements
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
@ -1284,7 +1285,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
int32_t numOfTables = tableListGetSize(pTableListInfo);
for(int32_t i = 0; i < numOfTables; ++i) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
taosArrayPush(pUidList, &pKeyInfo->uid);
}

View File

@ -0,0 +1,19 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_BACKEDN_SNAPSHOT_H_
#define _STREAM_BACKEDN_SNAPSHOT_H_
#include "tcommon.h"
#endif

View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamSnapshot.h"
#include "rocksdb/c.h"
#include "tcommon.h"
struct SStreamSnapHandle {
void* handle;
SArray* fileList;
};
struct SStreamSnapReader {
void* pMeta;
int64_t sver;
int64_t ever;
};
// SMetaSnapWriter ========================================
struct StreamSnapWriter {
void* pMeta;
int64_t sver;
int64_t ever;
};
void streamSnapHandleInit(SStreamSnapHandle* handle) {
// impl later
handle->fileList = taosArrayInit(32, sizeof(void*));
return;
}
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
for (int i = 0; handle && i < taosArrayGetSize(handle->fileList); i++) {
taosMemoryFree(taosArrayGetP(handle->fileList, i));
}
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, void** ppReader) {
// impl later
rocksdb_t* db = NULL;
return 0;
}
int32_t streamSnapReaderClose(void** ppReader) {
// impl later
return 0;
}
int32_t streamSnapRead(void* pReader, uint8_t** ppData) {
// impl later
return 0;
}
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, void** ppWriter) {
// impl later
return 0;
}
int32_t streamSnapWrite(void* pWriter, uint8_t* pData, uint32_t nData) {
// impl later
return 0;
}
int32_t streamSnapWriterClose(void** ppWriter, int8_t rollback) { return 0; }