Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

This commit is contained in:
Xiaoyu Wang 2022-07-30 15:30:33 +08:00
commit 67c0c7e473
151 changed files with 3118 additions and 3062 deletions

12
.gitmodules vendored
View File

@ -1,12 +0,0 @@
[submodule "src/connector/go"]
path = src/connector/go
url = git@github.com:taosdata/driver-go.git
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = git@github.com:taosdata/hivemq-tdengine-extension.git
[submodule "deps/TSZ"]
path = deps/TSZ
url = https://github.com/taosdata/TSZ.git
[submodule "examples/rust"]
path = examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git

View File

@ -118,6 +118,7 @@ def pre_test(){
git rm --cached tools/taos-tools 2>/dev/null || : git rm --cached tools/taos-tools 2>/dev/null || :
git rm --cached tools/taosadapter 2>/dev/null || : git rm --cached tools/taosadapter 2>/dev/null || :
git rm --cached tools/taosws-rs 2>/dev/null || : git rm --cached tools/taosws-rs 2>/dev/null || :
git rm --cached examples/rust 2>/dev/null || :
''' '''
sh ''' sh '''
cd ${WKC} cd ${WKC}
@ -269,6 +270,7 @@ def pre_test_win(){
git rm --cached tools/taos-tools 2>nul git rm --cached tools/taos-tools 2>nul
git rm --cached tools/taosadapter 2>nul git rm --cached tools/taosadapter 2>nul
git rm --cached tools/taosws-rs 2>nul git rm --cached tools/taosws-rs 2>nul
git rm --cached examples/rust 2>nul
exit 0 exit 0
''' '''
bat ''' bat '''

View File

@ -90,6 +90,12 @@ ELSE ()
ENDIF () ENDIF ()
ENDIF () ENDIF ()
option(
RUST_BINDINGS
"If build with rust-bindings"
ON
)
option( option(
JEMALLOC_ENABLED JEMALLOC_ENABLED
"If build with jemalloc" "If build with jemalloc"

View File

@ -0,0 +1,12 @@
# rust-bindings
ExternalProject_Add(rust-bindings
GIT_REPOSITORY https://github.com/songtianyi/tdengine-rust-bindings.git
GIT_TAG 7ed7a97
SOURCE_DIR "${TD_SOURCE_DIR}/examples/rust"
BINARY_DIR "${TD_SOURCE_DIR}/examples/rust"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,5 +1,5 @@
# zlib # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG df8678f GIT_TAG df8678f

View File

@ -1,8 +1,8 @@
# zlib # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG c529299 GIT_TAG 9dc2fec
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -1,5 +1,5 @@
# zlib # taosws-rs
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 9de599d GIT_TAG 9de599d

View File

@ -105,6 +105,11 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
# rust-bindings
if(${RUST_BINDINGS})
cat("${TD_SUPPORT_DIR}/rust-bindings_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${RUST_BINDINGS})
# lucene # lucene
if(${BUILD_WITH_LUCENE}) if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -136,6 +141,24 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
execute_process(COMMAND "${CMAKE_COMMAND}" --build . execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download")
# clear submodule
execute_process(COMMAND git submodule deinit -f tools/taos-tools
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taos-tools
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f tools/taosadapter
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taosadapter
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f tools/taosws-rs
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached tools/taosws-rs
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git submodule deinit -f examples/rust
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
execute_process(COMMAND git rm --cached examples/rust
WORKING_DIRECTORY "${TD_SOURCE_DIR}")
# ================================================================================================ # ================================================================================================
# Build # Build
# ================================================================================================ # ================================================================================================

@ -1 +0,0 @@
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12

View File

@ -155,7 +155,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);

View File

@ -16,65 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tdatablock.h" #include "tdatablock.h"
#include "tcompare.h" #include "tcompare.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "tname.h" #include "tname.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
strcpy(pEp->fqdn, ep);
char* temp = strchr(pEp->fqdn, ':');
if (temp) {
*temp = 0;
pEp->port = atoi(temp + 1);
}
if (pEp->port == 0) {
pEp->port = tsServerPort;
}
return 0;
}
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) {
if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) {
return;
}
int32_t index = pEpSet->numOfEps;
tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn));
pEpSet->eps[index].port = port;
pEpSet->numOfEps += 1;
}
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet;
taosCorEndWrite(&pEpSet->version);
}
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
SEpSet ep = {0};
taosCorBeginRead(&pEpSet->version);
ep = pEpSet->epSet;
taosCorEndRead(&pEpSet->version);
return ep;
}
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
ASSERT(pColumnInfoData != NULL); ASSERT(pColumnInfoData != NULL);
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
@ -1713,8 +1657,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag,
pDataBlock->info.childId, pDataBlock->info.groupId); pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag); printf("%s |", flag);
for (int32_t k = 0; k < numOfCols; k++) { for (int32_t k = 0; k < numOfCols; k++) {

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tdatablock.h"
#include "tglobal.h"
#include "tlog.h"
#include "tname.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
strcpy(pEp->fqdn, ep);
char* temp = strchr(pEp->fqdn, ':');
if (temp) {
*temp = 0;
pEp->port = atoi(temp + 1);
}
if (pEp->port == 0) {
pEp->port = tsServerPort;
}
return 0;
}
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) {
if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) {
return;
}
int32_t index = pEpSet->numOfEps;
tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn));
pEpSet->eps[index].port = port;
pEpSet->numOfEps += 1;
}
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet;
taosCorEndWrite(&pEpSet->version);
}
SEpSet getEpSet_s(SCorEpSet* pEpSet) {
SEpSet ep = {0};
taosCorBeginRead(&pEpSet->version);
ep = pEpSet->epSet;
taosCorEndRead(&pEpSet->version);
return ep;
}

View File

@ -20,34 +20,6 @@
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) #define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; }
#if 0
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0 || src == NULL) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = taosMemoryCalloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) taosMemoryCalloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == 0 && src->upperRelOptr == 0));
return pFilter;
}
#endif
#if 0 #if 0
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) { if (slidingTime == 0) {

View File

@ -31,6 +31,7 @@ target_sources(
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaCommit.c" "src/sma/smaCommit.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"
"src/sma/smaSnapshot.c"
"src/sma/smaTimeRange.c" "src/sma/smaTimeRange.c"
# tsdb # tsdb

View File

@ -209,6 +209,9 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen,
// smaFileUtil ================ // smaFileUtil ================
typedef struct SQTaskFReader SQTaskFReader;
typedef struct SQTaskFWriter SQTaskFWriter;
#define TD_FILE_HEAD_SIZE 512 #define TD_FILE_HEAD_SIZE 512
typedef struct STFInfo STFInfo; typedef struct STFInfo STFInfo;

View File

@ -97,7 +97,6 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow);
// TABLEID // TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY // TSDBKEY
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2)) #define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2)) #define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// SBlockCol // SBlockCol
@ -558,6 +557,26 @@ struct STsdbReadSnap {
STsdbFS fs; STsdbFS fs;
}; };
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -62,6 +62,8 @@ typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct SMetaSnapWriter SMetaSnapWriter;
typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapReader STsdbSnapReader;
typedef struct STsdbSnapWriter STsdbSnapWriter; typedef struct STsdbSnapWriter STsdbSnapWriter;
typedef struct SRsmaSnapReader SRsmaSnapReader;
typedef struct SRsmaSnapWriter SRsmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
@ -196,13 +198,21 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback); int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader); int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// SRsmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData);
// SRsmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
@ -314,6 +324,15 @@ struct SSma {
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
enum {
SNAP_DATA_META = 0,
SNAP_DATA_TSDB = 1,
SNAP_DATA_DEL = 2,
SNAP_DATA_RSMA1 = 3,
SNAP_DATA_RSMA2 = 4,
SNAP_DATA_QTASK = 5,
};
struct SSnapDataHdr { struct SSnapDataHdr {
int8_t type; int8_t type;
int64_t index; int64_t index;

View File

@ -183,11 +183,11 @@ int metaClose(SMeta *pMeta) {
int32_t metaRLock(SMeta *pMeta) { int32_t metaRLock(SMeta *pMeta) {
int32_t ret = 0; int32_t ret = 0;
metaDebug("meta rlock %p B", &pMeta->lock); metaTrace("meta rlock %p B", &pMeta->lock);
ret = taosThreadRwlockRdlock(&pMeta->lock); ret = taosThreadRwlockRdlock(&pMeta->lock);
metaDebug("meta rlock %p E", &pMeta->lock); metaTrace("meta rlock %p E", &pMeta->lock);
return ret; return ret;
} }
@ -195,11 +195,11 @@ int32_t metaRLock(SMeta *pMeta) {
int32_t metaWLock(SMeta *pMeta) { int32_t metaWLock(SMeta *pMeta) {
int32_t ret = 0; int32_t ret = 0;
metaDebug("meta wlock %p B", &pMeta->lock); metaTrace("meta wlock %p B", &pMeta->lock);
ret = taosThreadRwlockWrlock(&pMeta->lock); ret = taosThreadRwlockWrlock(&pMeta->lock);
metaDebug("meta wlock %p E", &pMeta->lock); metaTrace("meta wlock %p E", &pMeta->lock);
return ret; return ret;
} }
@ -207,11 +207,11 @@ int32_t metaWLock(SMeta *pMeta) {
int32_t metaULock(SMeta *pMeta) { int32_t metaULock(SMeta *pMeta) {
int32_t ret = 0; int32_t ret = 0;
metaDebug("meta ulock %p B", &pMeta->lock); metaTrace("meta ulock %p B", &pMeta->lock);
ret = taosThreadRwlockUnlock(&pMeta->lock); ret = taosThreadRwlockUnlock(&pMeta->lock);
metaDebug("meta ulock %p E", &pMeta->lock); metaTrace("meta ulock %p E", &pMeta->lock);
return ret; return ret;
} }

View File

@ -109,7 +109,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 0; // TODO: use macro pHdr->type = SNAP_DATA_META;
pHdr->size = nData; pHdr->size = nData;
memcpy(pHdr->data, pData, nData); memcpy(pHdr->data, pData, nData);

View File

@ -49,7 +49,8 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) { if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]); code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2,
&pReader->pDataReader[i]);
if (code < 0) { if (code < 0) {
goto _err; goto _err;
} }
@ -221,10 +222,9 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
} }
} }
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
return code; return code;
_err: _err:
@ -245,15 +245,17 @@ int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
} else if (pHdr->type == SNAP_DATA_QTASK) { } else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else {
ASSERT(0);
} }
if (code < 0) goto _err; if (code < 0) goto _err;
_exit: _exit:
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); smaInfo("vgId:%d rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
return code; return code;
_err: _err:
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, smaError("vgId:%d rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
tstrerror(code)); tstrerror(code));
return code; return code;
} }

View File

@ -307,7 +307,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0}; fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
fSma = *pRSet->pSmaF; fSma = *pRSet->pSmaF;
} else { } else {
wSet.diskId = (SDiskID){.level = 0, .id = 0}; SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
wSet.diskId = did;
wSet.fid = pCommitter->commitFid; wSet.fid = pCommitter->commitFid;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0}; fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0}; fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};

View File

@ -21,6 +21,7 @@ struct STsdbSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
STsdbFS fs; STsdbFS fs;
int8_t type;
// for data file // for data file
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
@ -62,7 +63,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iBlockIdx = 0; pReader->iBlockIdx = 0;
pReader->pBlockIdx = NULL; pReader->pBlockIdx = NULL;
tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid); tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
pReader->fid);
} }
while (true) { while (true) {
@ -130,7 +132,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 1; pHdr->type = pReader->type;
pHdr->size = size; pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]); TABLEID* pId = (TABLEID*)(&pHdr[1]);
@ -139,9 +141,9 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData); tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);
tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64 tsdbInfo("vgId:%d vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
" iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d", " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow, pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
size); size);
@ -154,7 +156,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -212,7 +215,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = 2; pHdr->type = SNAP_DATA_DEL;
pHdr->size = size; pHdr->size = size;
TABLEID* pId = (TABLEID*)(&pHdr[1]); TABLEID* pId = (TABLEID*)(&pHdr[1]);
@ -228,8 +231,8 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
n += tPutDelData((*ppData) + n, pDelData); n += tPutDelData((*ppData) + n, pDelData);
} }
tsdbInfo("vgId:%d vnode snapshot tsdb read del data, suid:%" PRId64 " uid:%d" PRId64 " size:%d", tsdbInfo("vgId:%d vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
TD_VID(pTsdb->pVnode), pDelIdx->suid, pDelIdx->uid, size); TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
break; break;
} }
@ -238,11 +241,12 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode,
tstrerror(code));
return code; return code;
} }
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader) { int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
STsdbSnapReader* pReader = NULL; STsdbSnapReader* pReader = NULL;
@ -255,6 +259,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
pReader->pTsdb = pTsdb; pReader->pTsdb = pTsdb;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
pReader->type = type;
code = taosThreadRwlockRdlock(&pTsdb->rwLock); code = taosThreadRwlockRdlock(&pTsdb->rwLock);
if (code) { if (code) {
@ -297,12 +302,13 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
goto _err; goto _err;
} }
tsdbInfo("vgId:%d vnode snapshot tsdb reader opened", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
*ppReader = pReader; *ppReader = pReader;
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
@ -327,7 +333,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
tsdbFSUnref(pReader->pTsdb, &pReader->fs); tsdbFSUnref(pReader->pTsdb, &pReader->fs);
tsdbInfo("vgId:%d vnode snapshot tsdb reader closed", TD_VID(pReader->pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
taosMemoryFree(pReader); taosMemoryFree(pReader);
*ppReader = NULL; *ppReader = NULL;
@ -368,10 +374,12 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
_exit: _exit:
tsdbDebug("vgId:%d vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb read failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode),
pReader->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -436,7 +444,8 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData,
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot write append data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -522,9 +531,12 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -570,6 +582,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -708,8 +722,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write table data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -794,11 +808,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if (code) goto _err; if (code) goto _err;
_exit: _exit:
tsdbDebug("vgId:%d vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write data impl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
@ -833,11 +848,12 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -920,12 +936,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbSnapWriteTableData(pWriter, id); code = tsdbSnapWriteTableData(pWriter, id);
if (code) goto _err; if (code) goto _err;
tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", tsdbInfo("vgId:%d vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -1015,7 +1032,8 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -1056,11 +1074,12 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
} }
_exit: _exit:
tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
return code; return code;
} }
@ -1127,10 +1146,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
} }
*ppWriter = pWriter; *ppWriter = pWriter;
return code;
tsdbInfo("vgId:%d tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshot writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code));
*ppWriter = NULL; *ppWriter = NULL;
return code; return code;
} }
@ -1157,14 +1178,16 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
if (code) goto _err; if (code) goto _err;
} }
tsdbInfo("vgId:%d vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
return code; return code;
_err: _err:
tsdbError("vgId:%d vnode snapshot tsdb writer close failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
tstrerror(code)); pWriter->pTsdb->path, tstrerror(code));
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code; return code;
} }
@ -1173,7 +1196,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// ts data // ts data
if (pHdr->type == 1) { if (pHdr->type == SNAP_DATA_TSDB) {
code = tsdbSnapWriteData(pWriter, pData, nData); code = tsdbSnapWriteData(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
@ -1186,15 +1209,17 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
} }
// del data // del data
if (pHdr->type == 2) { if (pHdr->type == SNAP_DATA_DEL) {
code = tsdbSnapWriteDel(pWriter, pData, nData); code = tsdbSnapWriteDel(pWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} }
_exit: _exit:
tsdbDebug("vgId:%d tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
tstrerror(code));
return code; return code;
} }

View File

@ -151,26 +151,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
// TSDBKEY =======================================================================
int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}
// TSDBKEY ====================================================== // TSDBKEY ======================================================
static FORCE_INLINE int32_t tPutTSDBKEY(uint8_t *p, TSDBKEY *pKey) { static FORCE_INLINE int32_t tPutTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
int32_t n = 0; int32_t n = 0;

View File

@ -28,7 +28,8 @@ struct SVSnapReader {
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
// rsma // rsma
int8_t rsmaDone[TSDB_RETENTION_L2]; int8_t rsmaDone;
SRsmaSnapReader *pRsmaReader;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
@ -57,6 +58,10 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
if (pReader->pRsmaReader) {
rsmaSnapReaderClose(&pReader->pRsmaReader);
}
if (pReader->pTsdbReader) { if (pReader->pTsdbReader) {
tsdbSnapReaderClose(&pReader->pTsdbReader); tsdbSnapReaderClose(&pReader->pTsdbReader);
} }
@ -99,7 +104,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
// open if not // open if not
if (pReader->pTsdbReader == NULL) { if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, &pReader->pTsdbReader); code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, &pReader->pTsdbReader);
if (code) goto _err; if (code) goto _err;
} }
@ -118,23 +123,14 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
// RSMA ============== // RSMA ==============
#if 0 if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
if (VND_IS_RSMA(pReader->pVnode)) { // open if not
// RSMA1/RSMA2 if (pReader->pRsmaReader == NULL) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { code = rsmaSnapReaderOpen(pReader->pVnode->pSma, pReader->sver, pReader->ever, &pReader->pRsmaReader);
if (!pReader->rsmaDone[i]) {
if (!pReader->pVnode->pSma->pRSmaTsdb[i]) {
// no valid tsdb
pReader->rsmaDone[i] = 1;
continue;
}
if (pReader->pTsdbReader == NULL) {
code = tsdbSnapReaderOpen(pReader->pVnode->pSma->pRSmaTsdb[i], pReader->sver, pReader->ever,
&pReader->pTsdbReader);
if (code) goto _err; if (code) goto _err;
} }
code = tsdbSnapRead(pReader->pTsdbReader, ppData); code = rsmaSnapRead(pReader->pRsmaReader, ppData);
if (code) { if (code) {
goto _err; goto _err;
} else { } else {
@ -142,16 +138,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
goto _exit; goto _exit;
} else { } else {
pReader->tsdbDone = 1; pReader->tsdbDone = 1;
code = tsdbSnapReaderClose(&pReader->pTsdbReader); code = rsmaSnapReaderClose(&pReader->pRsmaReader);
if (code) goto _err; if (code) goto _err;
} }
} }
} }
}
// QTaskInfoFile
// TODO ...
}
#endif
*ppData = NULL; *ppData = NULL;
*nData = 0; *nData = 0;
@ -186,6 +177,8 @@ struct SVSnapWriter {
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
STsdbSnapWriter *pTsdbSnapWriter; STsdbSnapWriter *pTsdbSnapWriter;
// rsma
SRsmaSnapWriter *pRsmaSnapWriter;
}; };
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
@ -235,6 +228,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _err; if (code) goto _err;
} }
if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _err;
}
if (!rollback) { if (!rollback) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
@ -282,9 +280,9 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index, vInfo("vgId:%d vnode snapshot write data, index:%" PRId64 " type:%d nData:%d", TD_VID(pVnode), pHdr->index,
pHdr->type, nData); pHdr->type, nData);
if (pHdr->type == 0) { switch (pHdr->type) {
case SNAP_DATA_META: {
// meta // meta
if (pWriter->pMetaSnapWriter == NULL) { if (pWriter->pMetaSnapWriter == NULL) {
code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter); code = metaSnapWriterOpen(pVnode->pMeta, pWriter->sver, pWriter->ever, &pWriter->pMetaSnapWriter);
if (code) goto _err; if (code) goto _err;
@ -292,9 +290,9 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} else { } break;
case SNAP_DATA_TSDB: {
// tsdb // tsdb
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
if (code) goto _err; if (code) goto _err;
@ -302,8 +300,31 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break;
case SNAP_DATA_RSMA1:
case SNAP_DATA_RSMA2: {
// rsma1/rsma2
if (pWriter->pRsmaSnapWriter == NULL) {
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
if (code) goto _err;
} }
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_QTASK: {
// qtask for rsma
if (pWriter->pRsmaSnapWriter == NULL) {
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter);
if (code) goto _err;
}
code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData);
if (code) goto _err;
} break;
default:
break;
}
_exit: _exit:
return code; return code;

View File

@ -447,6 +447,7 @@ _err:
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SEncoder encoder = {0};
int32_t rcode = 0; int32_t rcode = 0;
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq; SVCreateTbReq *pCreateReq;
@ -515,7 +516,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
// prepare rsp // prepare rsp
SEncoder encoder = {0};
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);

View File

@ -679,6 +679,8 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node); void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg); void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache); void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;

View File

@ -92,7 +92,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx*
int32_t code = 0; int32_t code = 0;
if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) { if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) {
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo)); CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo));
} }
STableMetaOutput moutput = {0}; STableMetaOutput moutput = {0};
@ -337,7 +337,10 @@ int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName,
} }
STableMeta* pMeta = NULL; STableMeta* pMeta = NULL;
CTG_ERR_RET(catalogGetTableMeta(pCtg, pConn, pTableName, &pMeta)); SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_RET(ctgGetTbMeta(pCtg, pConn, &ctx, &pMeta));
*tbType = pMeta->tableType; *tbType = pMeta->tableType;
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
@ -391,7 +394,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName,
CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL)); CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL));
} else { } else {
SVgroupInfo vgroupInfo = {0}; SVgroupInfo vgroupInfo = {0};
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, pTableName, &vgroupInfo)); CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo));
CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL)); CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL));
} }
@ -477,6 +480,57 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) {
if (IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
taosMemoryFreeClear(vgInfo);
}
CTG_RET(code);
}
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) {
int32_t code = 0;
if (NULL == pCtg || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCtg->dbCache) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true));
_return:
CTG_RET(code);
}
int32_t catalogInit(SCatalogCfg* cfg) { int32_t catalogInit(SCatalogCfg* cfg) {
if (gCtgMgmt.pCluster) { if (gCtgMgmt.pCluster) {
qError("catalog already initialized"); qError("catalog already initialized");
@ -772,21 +826,7 @@ _return:
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
CTG_API_ENTER(); CTG_API_ENTER();
int32_t code = 0; CTG_API_LEAVE(ctgRemoveTbMeta(pCtg, pTableName));
if (NULL == pCtg || NULL == pTableName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCtg->dbCache) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true));
_return:
CTG_API_LEAVE(code);
} }
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) { int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
@ -878,12 +918,12 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo *pConn, SArray*
case TSDB_CHILD_TABLE: { case TSDB_CHILD_TABLE: {
SName stb = name; SName stb = name;
strcpy(stb.tname, stbName); strcpy(stb.tname, stbName);
catalogRemoveTableMeta(pCtg, &stb); ctgRemoveTbMeta(pCtg, &stb);
break; break;
} }
case TSDB_SUPER_TABLE: case TSDB_SUPER_TABLE:
case TSDB_NORMAL_TABLE: case TSDB_NORMAL_TABLE:
catalogRemoveTableMeta(pCtg, &name); ctgRemoveTbMeta(pCtg, &name);
break; break;
default: default:
ctgError("ignore table type %d", tbType); ctgError("ignore table type %d", tbType);
@ -947,34 +987,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER(); CTG_API_ENTER();
if (IS_SYS_DBNAME(pTableName->dbname)) { CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup));
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
taosMemoryFreeClear(vgInfo);
}
CTG_API_LEAVE(code);
} }
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, SMetaData* pRsp) { int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
@ -1200,7 +1213,7 @@ int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const
} }
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(catalogRemoveTableMeta(pCtg, (SName*)pTableName)); CTG_ERR_JRET(ctgRemoveTbMeta(pCtg, (SName*)pTableName));
CTG_ERR_JRET(ctgGetTbCfg(pCtg, pConn, (SName*)pTableName, pCfg)); CTG_ERR_JRET(ctgGetTbCfg(pCtg, pConn, (SName*)pTableName, pCfg));

View File

@ -398,7 +398,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
SName* name = taosHashIterate(pTb, NULL); SName* name = taosHashIterate(pTb, NULL);
while (name) { while (name) {
catalogRemoveTableMeta(pCtg, name); ctgRemoveTbMeta(pCtg, name);
name = taosHashIterate(pTb, name); name = taosHashIterate(pTb, name);
} }

View File

@ -855,7 +855,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList); SArray* pColList);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
@ -987,8 +986,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model); const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
int32_t* resNum);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);

View File

@ -496,11 +496,9 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
doDestroyTask(pTaskInfo); doDestroyTask(pTaskInfo);
} }
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) { int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t capacity = 0; return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
} }
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {

View File

@ -4613,42 +4613,29 @@ void releaseQueryBuf(size_t numOfTables) {
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
} }
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
int32_t* resNum) { SExplainExecInfo execInfo = {0};
if (*resNum >= *capacity) { SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
*capacity += 10;
*pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo)); pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
if (NULL == *pRes) { pExplainInfo->startupCost = operatorInfo->cost.openCost;
qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo)); pExplainInfo->totalCost = operatorInfo->cost.totalCost;
return TSDB_CODE_QRY_OUT_OF_MEMORY; pExplainInfo->verboseLen = 0;
} pExplainInfo->verboseInfo = NULL;
}
SExplainExecInfo* pInfo = &(*pRes)[*resNum];
pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
pInfo->startupCost = operatorInfo->cost.openCost;
pInfo->totalCost = operatorInfo->cost.totalCost;
if (operatorInfo->fpSet.getExplainFn) { if (operatorInfo->fpSet.getExplainFn) {
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen); int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
if (code) { if (code) {
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code; return code;
} }
} else {
pInfo->verboseLen = 0;
pInfo->verboseInfo = NULL;
} }
++(*resNum);
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum); code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
if (code) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(*pRes); // taosMemoryFreeClear(*pRes);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
} }

View File

@ -31,14 +31,21 @@ static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void freeGroupKey(void* param) {
SGroupKeys* pKey = (SGroupKeys*) param;
taosMemoryFree(pKey->pData);
}
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->keyBuf); taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -414,8 +421,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby; // pOperator->operatorType = OP_Groupby;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;

View File

@ -2414,7 +2414,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "hyperloglog", .name = "hyperloglog",
.type = FUNCTION_TYPE_HYPERLOGLOG, .type = FUNCTION_TYPE_HYPERLOGLOG,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateHLL, .translateFunc = translateHLL,
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
@ -2428,7 +2428,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
}, },
{ {
.name = "_hyperloglog_partial", .name = "_hyperloglog_partial",
.type = FUNCTION_TYPE_HYPERLOGLOG_PARTIAL, .type = FUNCTION_TYPE_HYPERLOGLOG_PARTIAL | FUNC_MGT_TIMELINE_FUNC,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateHLLPartial, .translateFunc = translateHLLPartial,
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,
@ -2440,7 +2440,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
}, },
{ {
.name = "_hyperloglog_merge", .name = "_hyperloglog_merge",
.type = FUNCTION_TYPE_HYPERLOGLOG_MERGE, .type = FUNCTION_TYPE_HYPERLOGLOG_MERGE | FUNC_MGT_TIMELINE_FUNC,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateHLLMerge, .translateFunc = translateHLLMerge,
.getEnvFunc = getHLLFuncEnv, .getEnvFunc = getHLLFuncEnv,

View File

@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList);
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);

View File

@ -82,8 +82,9 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) {
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; SExplainExecInfo* pInfo = taosArrayGet(pExecList, 0);
SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
void * pRsp = rpcMallocCont(contLen); void * pRsp = rpcMallocCont(contLen);
@ -96,10 +97,9 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
.code = 0, .code = 0,
.info = *pConn, .info = *pConn,
}; };
rpcRsp.info.ahandle = NULL; rpcRsp.info.ahandle = NULL;
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -44,18 +44,24 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
static void freeItem(void* param) {
SExplainExecInfo* pInfo = param;
taosMemoryFree(pInfo->verboseInfo);
}
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t taskHandle = ctx->taskHandle; qTaskInfo_t taskHandle = ctx->taskHandle;
if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) {
if (ctx->explain) { if (ctx->explain) {
SExplainExecInfo *execInfo = NULL; SArray* execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
int32_t resNum = 0; QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo; SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL; connInfo.ahandle = NULL;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum)); int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeItem);
QW_ERR_RET(code);
} }
if (!ctx->needFetch) { if (!ctx->needFetch) {

View File

@ -790,65 +790,6 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
} while (0); } while (0);
#if 0
// fake match
//
// condition1:
// I have snapshot, no log, preIndex > myLastIndex
//
// condition2:
// I have snapshot, have log, log <= snapshot, preIndex > myLastIndex
//
// condition3:
// I have snapshot, preIndex < snapshot.lastApplyIndex
//
// condition4:
// I have snapshot, preIndex == snapshot.lastApplyIndex, no data
//
// operation:
// match snapshot.lastApplyIndex - 1;
// no operation on log
do {
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths);
bool condition1 =
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); // donot use syncLogEntryCount!!! use isEmpty
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
(pMsg->prevLogIndex > myLastIndex);
bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex);
bool condition4 = condition0 && (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->dataLen == 0);
bool condition = condition1 || condition2 || condition3 || condition4;
if (condition) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, fake match, pre-index:%" PRId64 ", pre-term:%" PRIu64,
pMsg->prevLogIndex, pMsg->prevLogTerm);
syncNodeEventLog(ths, logBuf);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true;
pReply->matchIndex = snapshot.lastApplyIndex;
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
} while (0);
#endif
// fake match // fake match
// //
// condition1: // condition1:

View File

@ -283,12 +283,14 @@ print ================== server restart completed
sql connect sql connect
sql use first_db0; sql use first_db0;
sql select last(*), tbname from m1 group by tbname; sql select last(*), tbname from m1 group by tbname order by tbname;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @20-03-01 01:01:01.000@ then if $data00 != @20-03-01 01:01:01.000@ then
print data00 $data00 != 20-03-01 01:01:01.000@
return -1 return -1
endi endi

View File

@ -47,7 +47,7 @@ endi
$replica = 3 $replica = 3
$vgroups = 1 $vgroups = 1
$retentions = 5s:7d,15s:21d $retentions = 5s:7d,15s:21d,1m:365d
print ============= create database print ============= create database
sql create database db replica $replica vgroups $vgroups retentions $retentions sql create database db replica $replica vgroups $vgroups retentions $retentions
@ -114,7 +114,7 @@ endi
vg_ready: vg_ready:
print ====> create stable/child table print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
@ -129,20 +129,28 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
sleep 3000 sleep 3000
print ===> write 100 records print ===> write 0-50 records
$N = 100 $ms = 0
$count = 0 $cnt = 0
while $count < $N while $cnt < 50
$ms = 1659000000000 + $count $ms = $cnt . m
sql insert into ct1 values( $ms , $count , 2.1, 3.1) sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
$count = $count + 1 $cnt = $cnt + 1
endw
print ===> flush database db
sql flush database db;
sleep 5000
print ===> write 51-100 records
while $cnt < 100
$ms = $cnt . m
sql insert into ct1 values (now + $ms , $cnt , 2.1, 3.1)
$cnt = $cnt + 1
endw endw
print ===> flush database db
#sql flush database db; sql flush database db;
sleep 5000
sleep 3000
print ===> stop dnode1 dnode2 dnode3 print ===> stop dnode1 dnode2 dnode3
@ -150,8 +158,6 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 10000
######################################################## ########################################################
print ===> start dnode1 dnode2 dnode3 dnode4 print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
@ -164,7 +170,7 @@ sleep 3000
print =============== query data print =============== query data
sql connect sql connect
sql use db sql use db
sql select * from ct1 sql select * from ct1 where ts > now - 1d
print rows: $rows print rows: $rows
print $data00 $data01 $data02 print $data00 $data01 $data02
if $rows != 100 then if $rows != 100 then

View File

@ -213,7 +213,7 @@ class TDTestCase:
tdSql.error("select irate(c1), abs(c1) from ct4 ") tdSql.error("select irate(c1), abs(c1) from ct4 ")
# agg functions mix with agg functions # agg functions mix with agg functions
tdSql.query("select irate(c1), count(c5) from stb1 partition by tbname ") tdSql.query("select irate(c1), count(c5) from stb1 partition by tbname order by tbname")
tdSql.checkData(0, 0, 0.000000000) tdSql.checkData(0, 0, 0.000000000)
tdSql.checkData(1, 0, 0.000000000) tdSql.checkData(1, 0, 0.000000000)
tdSql.checkData(0, 1, 13) tdSql.checkData(0, 1, 13)

View File

@ -348,16 +348,16 @@ int sml_16384_Test() {
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes); taos_free_result(pRes);
TAOS_RES *res = taos_schemaless_insert(taos, (char **)sql, 1, TSDB_SML_LINE_PROTOCOL, 0); pRes = taos_schemaless_insert(taos, (char **)sql, 1, TSDB_SML_LINE_PROTOCOL, 0);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes); int code = taos_errno(pRes);
taos_free_result(res); taos_free_result(pRes);
if(code) return code; if(code) return code;
const char *sql1[] = { const char *sql1[] = {
"qelhxo,id=pnnqhsa,t0=t,t1=127i8 c0=f,c1=127i8,c11=L\"ncharColValue\",c10=t 1626006833639000000", "qelhxo,id=pnnqhsa,t0=t,t1=127i8 c0=f,c1=127i8,c11=L\"ncharColValue\",c10=t 1626006833639000000",
}; };
TAOS_RES *res1 = taos_schemaless_insert(taos, (char **)sql1, 1, TSDB_SML_LINE_PROTOCOL, 0); pRes = taos_schemaless_insert(taos, (char **)sql1, 1, TSDB_SML_LINE_PROTOCOL, 0);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes); code = taos_errno(pRes);
taos_free_result(pRes); taos_free_result(pRes);