Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
faadf5d3a2
|
@ -120,8 +120,14 @@ ELSE ()
|
|||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
|
||||
MESSAGE(STATUS "Compile with Address Sanitizer!")
|
||||
ELSE ()
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||
MESSAGE(STATUS "XXXXXXXXXXXXXX Clang/AppleClang" ${TD_DARWIN})
|
||||
IF (${TD_DARWIN})
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k")
|
||||
ELSE ()
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||
ENDIF ()
|
||||
ENDIF ()
|
||||
|
||||
# disable all assert
|
||||
|
|
|
@ -109,7 +109,7 @@ option(
|
|||
option(
|
||||
BUILD_WITH_ROCKSDB
|
||||
"If build with rocksdb"
|
||||
OFF
|
||||
ON
|
||||
)
|
||||
|
||||
option(
|
||||
|
|
|
@ -223,17 +223,31 @@ endif(${BUILD_WITH_LEVELDB})
|
|||
# rocksdb
|
||||
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
|
||||
if(${BUILD_WITH_ROCKSDB})
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
|
||||
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
|
||||
option(WITH_TESTS "" OFF)
|
||||
option(WITH_BENCHMARK_TOOLS "" OFF)
|
||||
option(WITH_TOOLS "" OFF)
|
||||
option(WITH_LIBURING "" OFF)
|
||||
option(WITH_IOSTATS_CONTEXT "" OFF)
|
||||
option(WITH_PERF_CONTEXT "" OFF)
|
||||
option(FAIL_ON_WARNINGS "" OFF)
|
||||
#option(WITH_JEMALLOC "" ON)
|
||||
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
|
||||
IF (${TD_WINDOWS})
|
||||
option(WITH_MD_LIBRARY "build with MD" OFF)
|
||||
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
|
||||
endif(${TD_WINDOWS})
|
||||
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
|
||||
target_include_directories(
|
||||
rocksdb
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include>
|
||||
)
|
||||
IF (${TD_DARWIN})
|
||||
target_compile_options(
|
||||
rocksdb
|
||||
PRIVATE -Wno-unused-private-field
|
||||
)
|
||||
endif(${TD_DARWIN})
|
||||
endif(${BUILD_WITH_ROCKSDB})
|
||||
|
||||
# lucene
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
message("contrib test/rocksdb:" ${BUILD_DEPENDENCY_TESTS})
|
||||
|
||||
add_executable(rocksdbTest "")
|
||||
target_sources(rocksdbTest
|
||||
PRIVATE
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/main.c"
|
||||
)
|
||||
target_link_libraries(rocksdbTest rocksdb)
|
||||
target_link_libraries(rocksdbTest rocksdb)
|
||||
|
|
|
@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) {
|
|||
|
||||
// Read
|
||||
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
|
||||
rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
|
||||
//rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
|
||||
char buf[256] = {0};
|
||||
size_t vallen = 0;
|
||||
char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
|
||||
printf("val:%s\n", val);
|
||||
snprintf(buf, vallen+5, "val:%s", val);
|
||||
printf("%ld %ld %s\n", strlen(val), vallen, buf);
|
||||
|
||||
// Update
|
||||
// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err);
|
||||
|
@ -43,4 +45,4 @@ int main(int argc, char const *argv[]) {
|
|||
rocksdb_close(db);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ typedef struct SMergeLogicNode {
|
|||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
} SMergeLogicNode;
|
||||
|
||||
typedef enum EWindowType {
|
||||
|
@ -444,6 +445,7 @@ typedef struct SMergePhysiNode {
|
|||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
} SMergePhysiNode;
|
||||
|
||||
typedef struct SWinodwPhysiNode {
|
||||
|
|
|
@ -1342,6 +1342,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
|
||||
if (code) {
|
||||
|
@ -1368,7 +1370,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
tsem_wait(&pParam->sem);
|
||||
|
||||
_return:
|
||||
taosArrayDestroy(catalogReq.pTableMeta);
|
||||
taosArrayDestroyEx(catalogReq.pTableMeta, destoryTablesReq);
|
||||
destroyRequest(pRequest);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1219,6 +1219,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
|
|||
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
|
||||
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
taosHashCleanup(kvHash);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
@ -1292,12 +1293,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
|||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
|
||||
info->lineNum);
|
||||
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
|
||||
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
return terrno;
|
||||
}
|
||||
smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
|
||||
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
||||
|
|
|
@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
|||
}
|
||||
|
||||
if (pIter->pRow->flag == HAS_NULL) {
|
||||
pIter->cv = COL_VAL_NULL(pTColumn->type, pTColumn->colId);
|
||||
pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -2439,7 +2439,7 @@ _exit:
|
|||
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
|
||||
char *data) {
|
||||
int32_t code = 0;
|
||||
if(data == NULL){
|
||||
if (data == NULL) {
|
||||
for (int32_t i = 0; i < nRows; ++i) {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
|
||||
}
|
||||
|
@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
|
|||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
if(ASSERT(varDataTLen(data + offset) <= bytes)){
|
||||
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes);
|
||||
if (ASSERT(varDataTLen(data + offset) <= bytes)) {
|
||||
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
|
||||
bytes);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
goto _exit;
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ target_include_directories(
|
|||
PUBLIC "inc"
|
||||
PUBLIC "src/inc"
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
||||
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
|
||||
)
|
||||
target_link_libraries(
|
||||
vnode
|
||||
|
@ -112,6 +113,7 @@ target_link_libraries(
|
|||
|
||||
# PUBLIC bdb
|
||||
# PUBLIC scalar
|
||||
PUBLIC rocksdb
|
||||
PUBLIC transport
|
||||
PUBLIC stream
|
||||
PUBLIC index
|
||||
|
|
|
@ -182,7 +182,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
|
|||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
|
||||
|
||||
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
||||
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||
void tsdbReaderClose(STsdbReader *pReader);
|
||||
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||
|
@ -196,8 +196,9 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
|
|||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
uint64_t suid, void **pReader, const char *idstr);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
|
||||
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
|
||||
SArray *pTableUids);
|
||||
void *tsdbCacherowsReaderClose(void *pReader);
|
||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
||||
|
|
|
@ -343,6 +343,16 @@ struct STsdbFS {
|
|||
SArray *aDFileSet; // SArray<SDFileSet>
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
rocksdb_t *db;
|
||||
rocksdb_options_t *options;
|
||||
rocksdb_flushoptions_t *flushoptions;
|
||||
rocksdb_writeoptions_t *writeoptions;
|
||||
rocksdb_readoptions_t *readoptions;
|
||||
rocksdb_writebatch_t *writebatch;
|
||||
TdThreadMutex rMutex;
|
||||
} SRocksCache;
|
||||
|
||||
struct STsdb {
|
||||
char *path;
|
||||
SVnode *pVnode;
|
||||
|
@ -358,6 +368,7 @@ struct STsdb {
|
|||
#ifdef USE_DEV_CODE
|
||||
struct STFileSystem *pFS;
|
||||
#endif
|
||||
SRocksCache rCache;
|
||||
};
|
||||
|
||||
struct TSDBKEY {
|
||||
|
@ -778,6 +789,8 @@ typedef struct SCacheRowsReader {
|
|||
uint64_t suid;
|
||||
char **transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
SArray *pCidList;
|
||||
int32_t *pSlotIds;
|
||||
int32_t type;
|
||||
int32_t tableIndex; // currently returned result tables
|
||||
STableKeyInfo *pTableList; // table id list
|
||||
|
@ -797,6 +810,10 @@ typedef struct {
|
|||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(STsdb *pTsdb);
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
|
||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype);
|
||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "executor.h"
|
||||
#include "filter.h"
|
||||
#include "qworker.h"
|
||||
#include "rocksdb/c.h"
|
||||
#include "sync.h"
|
||||
#include "tRealloc.h"
|
||||
#include "tchecksum.h"
|
||||
|
@ -177,6 +178,7 @@ int tsdbClose(STsdb** pTsdb);
|
|||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
||||
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||
|
@ -193,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
|
|||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
|
||||
int32_t type);
|
||||
int32_t type);
|
||||
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
||||
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
|
||||
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
|
||||
|
@ -47,6 +46,502 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
#define ROCKS_KEY_LEN 64
|
||||
|
||||
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
if (pVnode->pTfs) {
|
||||
if (path) {
|
||||
snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
|
||||
pTsdb->path, TD_DIRSEP);
|
||||
}
|
||||
} else {
|
||||
if (path) {
|
||||
snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
|
||||
rocksdb_options_t *options = rocksdb_options_create();
|
||||
if (NULL == options) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
rocksdb_options_set_create_if_missing(options, 1);
|
||||
// rocksdb_options_set_inplace_update_support(options, 1);
|
||||
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
|
||||
|
||||
rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
|
||||
if (NULL == writeoptions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err2;
|
||||
}
|
||||
// rocksdb_writeoptions_disable_WAL(writeoptions, 1);
|
||||
|
||||
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
|
||||
if (NULL == readoptions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err2;
|
||||
}
|
||||
|
||||
char *err = NULL;
|
||||
char cachePath[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetRocksPath(pTsdb, cachePath);
|
||||
|
||||
rocksdb_t *db = rocksdb_open(options, cachePath, &err);
|
||||
if (NULL == db) {
|
||||
code = -1;
|
||||
goto _err3;
|
||||
}
|
||||
|
||||
rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
|
||||
if (NULL == flushoptions) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err4;
|
||||
}
|
||||
|
||||
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
||||
|
||||
pTsdb->rCache.writebatch = writebatch;
|
||||
pTsdb->rCache.options = options;
|
||||
pTsdb->rCache.writeoptions = writeoptions;
|
||||
pTsdb->rCache.readoptions = readoptions;
|
||||
pTsdb->rCache.flushoptions = flushoptions;
|
||||
pTsdb->rCache.db = db;
|
||||
|
||||
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
|
||||
|
||||
return code;
|
||||
|
||||
_err4:
|
||||
rocksdb_readoptions_destroy(readoptions);
|
||||
_err3:
|
||||
rocksdb_writeoptions_destroy(writeoptions);
|
||||
_err2:
|
||||
rocksdb_options_destroy(options);
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tsdbCloseRocksCache(STsdb *pTsdb) {
|
||||
rocksdb_close(pTsdb->rCache.db);
|
||||
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
|
||||
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
|
||||
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
|
||||
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
|
||||
rocksdb_options_destroy(pTsdb->rCache.options);
|
||||
taosThreadMutexDestroy(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char *err = NULL;
|
||||
|
||||
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
code = -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
SLastCol *tsdbCacheDeserialize(char const *value) {
|
||||
if (!value) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pColVal->value.pData = (char *)value + sizeof(*pLastCol);
|
||||
} else {
|
||||
pColVal->value.pData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
size_t length = sizeof(*pLastCol);
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
length += pColVal->value.nData;
|
||||
}
|
||||
*value = taosMemoryMalloc(length);
|
||||
|
||||
*(SLastCol *)(*value) = *pLastCol;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = *value + sizeof(*pLastCol);
|
||||
if (pColVal->value.nData > 0) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
} else {
|
||||
pColVal->value.pData = NULL;
|
||||
}
|
||||
}
|
||||
*size = length;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring);
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
// 1, fetch schema
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t sver = TSDBROW_SVERSION(pRow);
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2, iterate col values into array
|
||||
SArray *aColVal = taosArrayInit(32, sizeof(SColVal));
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
tsdbRowIterOpen(&iter, pRow, pTSchema);
|
||||
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
/*
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
|
||||
pColVal->value.pData = NULL;
|
||||
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
|
||||
if (code) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
*/
|
||||
taosArrayPush(aColVal, pColVal);
|
||||
}
|
||||
|
||||
tsdbRowClose(&iter);
|
||||
|
||||
// 3, build keys & multi get from rocks
|
||||
int num_keys = TARRAY_SIZE(aColVal);
|
||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
int16_t cid = pColVal->cid;
|
||||
|
||||
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
|
||||
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid);
|
||||
if (last_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid);
|
||||
if (lr_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
keys_list[i] = keys;
|
||||
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN;
|
||||
keys_list_sizes[i] = last_key_len;
|
||||
keys_list_sizes[num_keys + i] = lr_key_len;
|
||||
}
|
||||
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
taosMemoryFree(keys_list[i]);
|
||||
}
|
||||
for (int i = 0; i < num_keys * 2; ++i) {
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
TSKEY keyTs = TSDBROW_TS(pRow);
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
|
||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid);
|
||||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
|
||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
||||
|
||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid);
|
||||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
}
|
||||
|
||||
rocksdb_free(values_list[i]);
|
||||
rocksdb_free(values_list[i + num_keys]);
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
rocksdb_writebatch_clear(wb);
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(aColVal);
|
||||
taosMemoryFree(pTSchema);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
|
||||
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
|
||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) {
|
||||
static char const *alstring[2] = {"last_row", "last"};
|
||||
char const *lstring = alstring[ltype];
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *pCidList = pr->pCidList;
|
||||
int num_keys = TARRAY_SIZE(pCidList);
|
||||
char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
|
||||
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
|
||||
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring);
|
||||
if (last_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
|
||||
keys_list[i] = keys;
|
||||
keys_list_sizes[i] = last_key_len;
|
||||
}
|
||||
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
char **errs = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
taosMemoryFree(keys_list[i]);
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
bool freeCol = true;
|
||||
SArray *pTmpColArray = NULL;
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
||||
if (pLastCol) {
|
||||
SColVal *pColVal = &pLastCol->colVal;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
|
||||
if (pColVal->value.nData) {
|
||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring);
|
||||
if (!pLastCol) {
|
||||
// recalc: load from tsdb
|
||||
int16_t aCols[1] = {cid};
|
||||
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
||||
pTmpColArray = NULL;
|
||||
|
||||
if (ltype) {
|
||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
} else {
|
||||
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
}
|
||||
|
||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// still null, then make up a none col value
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring);
|
||||
rocksdb_writebatch_put(wb, key, klen, value, vlen);
|
||||
|
||||
taosMemoryFree(value);
|
||||
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
taosArrayPush(pLastArray, pLastCol);
|
||||
|
||||
taosArrayDestroy(pTmpColArray);
|
||||
if (freeCol) {
|
||||
taosMemoryFree(pLastCol);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
// 1, fetch schema
|
||||
STSchema *pTSchema = NULL;
|
||||
int32_t sver = -1;
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 3, build keys & multi get from rocks
|
||||
int num_keys = pTSchema->numOfCols;
|
||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
|
||||
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
|
||||
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid);
|
||||
if (last_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid);
|
||||
if (lr_key_len >= ROCKS_KEY_LEN) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
keys_list[i] = keys;
|
||||
keys_list[num_keys + i] = keys + ROCKS_KEY_LEN;
|
||||
keys_list_sizes[i] = last_key_len;
|
||||
keys_list_sizes[num_keys + i] = lr_key_len;
|
||||
}
|
||||
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
taosMemoryFree(keys_list[i]);
|
||||
}
|
||||
for (int i = 0; i < num_keys * 2; ++i) {
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid);
|
||||
rocksdb_writebatch_delete(wb, key, klen);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
||||
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
|
||||
char key[ROCKS_KEY_LEN];
|
||||
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid);
|
||||
rocksdb_writebatch_delete(wb, key, klen);
|
||||
}
|
||||
|
||||
rocksdb_free(values_list[i]);
|
||||
rocksdb_free(values_list[i + num_keys]);
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
char *err = NULL;
|
||||
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
rocksdb_writebatch_clear(wb);
|
||||
|
||||
_exit:
|
||||
taosMemoryFree(pTSchema);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
SLRUCache *pCache = NULL;
|
||||
|
@ -64,6 +559,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbOpenRocksCache(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosLRUCacheSetStrictCapacity(pCache, false);
|
||||
|
||||
taosThreadMutexInit(&pTsdb->lruMutex, NULL);
|
||||
|
@ -84,6 +585,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
|||
}
|
||||
|
||||
tsdbCloseBICache(pTsdb);
|
||||
tsdbCloseRocksCache(pTsdb);
|
||||
}
|
||||
|
||||
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
|
||||
|
@ -170,7 +672,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
/*
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
|
@ -225,7 +727,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
*/
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
|
@ -1423,6 +1925,21 @@ static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
|
||||
SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
|
||||
if (NULL == pColArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < nCols; ++i) {
|
||||
int16_t slotId = slotIds[i];
|
||||
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
|
||||
taosArrayPush(pColArray, &col);
|
||||
}
|
||||
*ppColArray = pColArray;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
||||
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
||||
*ppDst = taosMemoryMalloc(len);
|
||||
|
@ -1761,6 +2278,342 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds) {
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nLastCol = nCols;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
bool hasRow = false;
|
||||
bool ignoreEarlierTs = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
|
||||
if (NULL == aColArray) {
|
||||
taosArrayDestroy(pColArray);
|
||||
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
hasRow = true;
|
||||
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
if (sversion != -1) {
|
||||
code = updateTSchema(sversion, pr, uid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _err;
|
||||
}
|
||||
pTSchema = pr->pCurrSchema;
|
||||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
if (pCol->colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||
if (!setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
} else {
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// done, goto return pColArray
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
// high version's column value
|
||||
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
SColVal *tColVal = &lastColVal->colVal;
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
taosArraySet(pColArray, iCol, &lastCol);
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
if (!hasRow) {
|
||||
if (ignoreEarlierTs) {
|
||||
taosArrayDestroy(pColArray);
|
||||
pColArray = NULL;
|
||||
} else {
|
||||
taosArrayClear(pColArray);
|
||||
}
|
||||
}
|
||||
*ppLastArray = pColArray;
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(aColArray);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
// taosMemoryFreeClear(pTSchema);
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
taosArrayDestroy(aColArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds) {
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nLastCol = nCols;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
bool hasRow = false;
|
||||
bool ignoreEarlierTs = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
|
||||
if (NULL == aColArray) {
|
||||
taosArrayDestroy(pColArray);
|
||||
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
}
|
||||
|
||||
hasRow = true;
|
||||
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
if (sversion != -1) {
|
||||
code = updateTSchema(sversion, pr, uid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _err;
|
||||
}
|
||||
pTSchema = pr->pCurrSchema;
|
||||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
if (pCol->colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
if (COL_VAL_IS_NONE(pColVal)) {
|
||||
if (!setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
} else {
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// done, goto return pColArray
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
// high version's column value
|
||||
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
SColVal *tColVal = &lastColVal->colVal;
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
taosArraySet(pColArray, iCol, &lastCol);
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
} else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
if (!hasRow) {
|
||||
if (ignoreEarlierTs) {
|
||||
taosArrayDestroy(pColArray);
|
||||
pColArray = NULL;
|
||||
} else {
|
||||
taosArrayClear(pColArray);
|
||||
}
|
||||
}
|
||||
*ppLastArray = pColArray;
|
||||
|
||||
nextRowIterClose(&iter);
|
||||
taosArrayDestroy(aColArray);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
nextRowIterClose(&iter);
|
||||
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
taosArrayDestroy(aColArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
|
|
|
@ -21,47 +21,30 @@
|
|||
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||
|
||||
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
||||
void** pRes, const char* idStr) {
|
||||
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
bool allNullRow = true;
|
||||
|
||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
bool allNullRow = true;
|
||||
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
|
||||
if (slotIds[i] == -1) { // the primary timestamp
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
p->ts = pColVal->ts;
|
||||
p->bytes = TSDB_KEYSIZE;
|
||||
*(int64_t*)p->buf = pColVal->ts;
|
||||
allNullRow = false;
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
// add check for null value, caused by the modification of table schema (new column added).
|
||||
if (slotId >= taosArrayGetSize(pRow)) {
|
||||
p->ts = 0;
|
||||
p->isNull = true;
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
p->ts = pColVal->ts;
|
||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
allNullRow = p->isNull & allNullRow;
|
||||
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
if (!p->isNull) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||
|
||||
p->ts = pColVal->ts;
|
||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
allNullRow = p->isNull & allNullRow;
|
||||
|
||||
if (!p->isNull) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
|
||||
} else {
|
||||
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
||||
}
|
||||
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
|
||||
} else {
|
||||
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,36 +57,31 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
pBlock->info.rows += allNullRow ? 0 : 1;
|
||||
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||
|
||||
if (slotIds[i] == -1) {
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
// add check for null value, caused by the modification of table schema (new column added).
|
||||
if (slotId >= taosArrayGetSize(pRow)) {
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
SColVal* pVal = &pColVal->colVal;
|
||||
|
||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||
continue;
|
||||
}
|
||||
allNullRow = false;
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
SColVal* pVal = &pColVal->colVal;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
} else {
|
||||
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
|
||||
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
|
||||
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
}
|
||||
} else {
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
|
||||
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
|
||||
|
||||
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
|
||||
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
}
|
||||
} else {
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
pBlock->info.rows += allNullRow ? 0 : 1;
|
||||
} else {
|
||||
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -143,7 +121,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
|
|||
}
|
||||
|
||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
uint64_t suid, void** pReader, const char* idstr) {
|
||||
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
|
||||
*pReader = NULL;
|
||||
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
|
||||
if (p == NULL) {
|
||||
|
@ -155,6 +133,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->pTsdb = p->pVnode->pTsdb;
|
||||
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
|
||||
p->numOfCols = numOfCols;
|
||||
p->pCidList = pCidList;
|
||||
p->pSlotIds = pSlotIds;
|
||||
p->suid = suid;
|
||||
|
||||
if (numOfTables == 0) {
|
||||
|
@ -226,32 +206,9 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
|
||||
LRUHandle** h) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pRow = NULL;
|
||||
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
|
||||
} else {
|
||||
code = tsdbCacheGetLastH(lruCache, uid, pr, h);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// no data in the table of Uid
|
||||
if (*h != NULL) {
|
||||
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void freeItem(void* pItem) {
|
||||
SLastCol* pCol = (SLastCol*)pItem;
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.type) && pCol->colVal.value.pData) {
|
||||
taosMemoryFree(pCol->colVal.value.pData);
|
||||
}
|
||||
}
|
||||
|
@ -277,19 +234,17 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
|
||||
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds,
|
||||
SArray* pTableUidList) {
|
||||
if (pReader == NULL || pResBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SCacheRowsReader* pr = pReader;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
|
||||
LRUHandle* h = NULL;
|
||||
SArray* pRow = NULL;
|
||||
bool hasRes = false;
|
||||
SArray* pLastCols = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
|
||||
bool hasRes = false;
|
||||
SArray* pLastCols = NULL;
|
||||
|
||||
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||
if (pRes == NULL) {
|
||||
|
@ -298,20 +253,22 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
pRes[j] = taosMemoryCalloc(
|
||||
1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
|
||||
pRes[j] =
|
||||
taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes +
|
||||
VARSTR_HEADER_SIZE);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
|
||||
p->ts = INT64_MIN;
|
||||
}
|
||||
|
||||
pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
|
||||
pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
|
||||
if (pLastCols == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
|
||||
struct STColumn* pCol = &pr->pSchema->columns[i];
|
||||
for (int32_t i = 0; i < pr->numOfCols; ++i) {
|
||||
int32_t slotId = slotIds[i];
|
||||
struct STColumn* pCol = &pr->pSchema->columns[slotId];
|
||||
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
|
@ -328,6 +285,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
pr->pDataFReader = NULL;
|
||||
pr->pDataFReaderLast = NULL;
|
||||
|
||||
int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -335,16 +294,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||
|
||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (h == NULL) {
|
||||
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
continue;
|
||||
}
|
||||
if (taosArrayGetSize(pRow) <= 0) {
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -352,47 +309,34 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
bool hasNotNullRow = true;
|
||||
int64_t singleTableLastTs = INT64_MAX;
|
||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
int32_t slotId = slotIds[k];
|
||||
SLastCol* p = taosArrayGet(pLastCols, k);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
|
||||
|
||||
if (slotId == -1) { // the primary timestamp
|
||||
SLastCol* p = taosArrayGet(pLastCols, 0);
|
||||
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
if (pCol->ts > p->ts) {
|
||||
hasRes = true;
|
||||
p->ts = pCol->ts;
|
||||
p->colVal = pCol->colVal;
|
||||
singleTableLastTs = pCol->ts;
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
SLastCol* p = taosArrayGet(pLastCols, slotId);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
singleTableLastTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
}
|
||||
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
singleTableLastTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
} else {
|
||||
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
}
|
||||
|
||||
p->colVal.value.nData = pColVal->colVal.value.nData;
|
||||
p->colVal.type = pColVal->colVal.type;
|
||||
p->colVal.flag = pColVal->colVal.flag;
|
||||
p->colVal.cid = pColVal->colVal.cid;
|
||||
}
|
||||
p->colVal.value.nData = pColVal->colVal.value.nData;
|
||||
p->colVal.type = pColVal->colVal.type;
|
||||
p->colVal.flag = pColVal->colVal.flag;
|
||||
p->colVal.cid = pColVal->colVal.cid;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -414,33 +358,31 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
|
||||
}
|
||||
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
}
|
||||
|
||||
if (hasRes) {
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
}
|
||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (h == NULL) {
|
||||
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
continue;
|
||||
}
|
||||
if (taosArrayGetSize(pRow) <= 0) {
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
continue;
|
||||
}
|
||||
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||
// TODO reset the pRes
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
|
||||
taosArrayPush(pTableUidList, &pKeyInfo->uid);
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
|
||||
pr->tableIndex += 1;
|
||||
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||
|
@ -466,6 +408,7 @@ _end:
|
|||
}
|
||||
|
||||
taosMemoryFree(pRes);
|
||||
taosArrayDestroyEx(pRow, freeItem);
|
||||
taosArrayDestroyEx(pLastCols, freeItem);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -140,7 +140,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
SMemTable *pMemTable = pTsdb->mem;
|
||||
STbData *pTbData = NULL;
|
||||
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
||||
TSDBKEY lastKey = {.version = version, .ts = eKey};
|
||||
|
||||
// check if table exists
|
||||
SMetaInfo info;
|
||||
|
@ -181,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
pMemTable->nDel++;
|
||||
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
||||
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
|
||||
|
||||
/*
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
||||
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
|
@ -189,6 +188,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
*/
|
||||
if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
|
||||
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
|
||||
}
|
||||
|
||||
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||
" at version %" PRId64,
|
||||
|
@ -284,8 +287,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
|||
|
||||
int64_t tsdbCountTbDataRows(STbData *pTbData) {
|
||||
SMemSkipListNode *pNode = pTbData->sl.pHead;
|
||||
int64_t rowsNum = 0;
|
||||
|
||||
int64_t rowsNum = 0;
|
||||
|
||||
while (NULL != pNode) {
|
||||
pNode = SL_GET_NODE_FORWARD(pNode, 0);
|
||||
if (pNode == pTbData->sl.pTail) {
|
||||
|
@ -298,17 +301,17 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
|
|||
return rowsNum;
|
||||
}
|
||||
|
||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
|
||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum) {
|
||||
taosRLockLatch(&pMemTable->latch);
|
||||
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
|
||||
STbData *pTbData = pMemTable->aBucket[i];
|
||||
while (pTbData) {
|
||||
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
|
||||
void *p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
|
||||
if (p == NULL) {
|
||||
pTbData = pTbData->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
*rowsNum += tsdbCountTbDataRows(pTbData);
|
||||
pTbData = pTbData->next;
|
||||
}
|
||||
|
@ -668,15 +671,8 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
|
||||
if (key.ts >= pTbData->maxKey) {
|
||||
pTbData->maxKey = key.ts;
|
||||
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
|
||||
}
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
|
||||
// SMemTable
|
||||
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
|
||||
|
@ -736,15 +732,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
|
||||
if (key.ts >= pTbData->maxKey) {
|
||||
pTbData->maxKey = key.ts;
|
||||
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
|
||||
}
|
||||
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
|
||||
|
||||
// SMemTable
|
||||
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
|
||||
|
|
|
@ -151,8 +151,8 @@ _exit:
|
|||
}
|
||||
|
||||
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
|
||||
bool diskAvail = osDataSpaceAvailable();
|
||||
bool needCommit = false;
|
||||
bool diskAvail = osDataSpaceAvailable();
|
||||
bool needCommit = false;
|
||||
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
if (pVnode->inUse && diskAvail) {
|
||||
|
@ -453,6 +453,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
|||
#endif
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbCacheCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
code = smaCommit(pVnode->pSma, pInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -1128,6 +1128,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false");
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
|
||||
if (pMergeNode->groupSort) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc");
|
||||
|
|
|
@ -31,18 +31,21 @@ typedef struct SCacheRowsScanInfo {
|
|||
void* pLastrowReader;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t* pSlotIds;
|
||||
int32_t* pDstSlotIds;
|
||||
SExprSupp pseudoExprSup;
|
||||
int32_t retrieveType;
|
||||
int32_t currentGroupIndex;
|
||||
SSDataBlock* pBufferredRes;
|
||||
SArray* pUidList;
|
||||
SArray* pCidList;
|
||||
int32_t indexOfBufferedRes;
|
||||
STableListInfo* pTableList;
|
||||
} SCacheRowsScanInfo;
|
||||
|
||||
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
|
||||
static void destroyCacheScanOperator(void* param);
|
||||
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
||||
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
|
||||
int32_t** pDstSlotIds);
|
||||
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
|
||||
|
||||
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
||||
|
@ -71,9 +74,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
|
||||
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
|
||||
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
taosArrayPush(pCidList, &pColInfo->colId);
|
||||
}
|
||||
pInfo->pCidList = pCidList;
|
||||
|
||||
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
|
||||
|
||||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
|
||||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -91,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
|
||||
uint64_t suid = tableListGetSuid(pTableListInfo);
|
||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
||||
pTaskInfo->id.str);
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, suid,
|
||||
&pInfo->pLastrowReader, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -160,8 +170,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
blockDataCleanup(pInfo->pBufferredRes);
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
|
||||
int32_t code =
|
||||
tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds,
|
||||
pInfo->pDstSlotIds, pInfo->pUidList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -227,8 +237,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
|
||||
pTaskInfo->id.str);
|
||||
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
|
||||
&pInfo->pLastrowReader, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pInfo->currentGroupIndex += 1;
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
|
@ -237,7 +247,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
|
||||
taosArrayClear(pInfo->pUidList);
|
||||
|
||||
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
|
||||
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
|
||||
pInfo->pUidList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -280,6 +291,8 @@ void destroyCacheScanOperator(void* param) {
|
|||
blockDataDestroy(pInfo->pRes);
|
||||
blockDataDestroy(pInfo->pBufferredRes);
|
||||
taosMemoryFree(pInfo->pSlotIds);
|
||||
taosMemoryFree(pInfo->pDstSlotIds);
|
||||
taosArrayDestroy(pInfo->pCidList);
|
||||
taosArrayDestroy(pInfo->pUidList);
|
||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
tableListDestroy(pInfo->pTableList);
|
||||
|
@ -292,7 +305,8 @@ void destroyCacheScanOperator(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
|
||||
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
|
||||
int32_t** pDstSlotIds) {
|
||||
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||
|
||||
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
|
@ -300,18 +314,25 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
if (*pDstSlotIds == NULL) {
|
||||
taosMemoryFree(*pSlotIds);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
|
||||
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
|
||||
if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
(*pSlotIds)[pColMatch->dstSlotId] = -1;
|
||||
break;
|
||||
}
|
||||
}*/
|
||||
|
||||
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
|
||||
(*pSlotIds)[pColMatch->dstSlotId] = j;
|
||||
(*pSlotIds)[i] = j;
|
||||
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -545,6 +545,7 @@ typedef struct SMultiwayMergeOperatorInfo {
|
|||
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
||||
int64_t startTs; // sort start time
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SMultiwayMergeOperatorInfo;
|
||||
|
@ -694,7 +695,11 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
}
|
||||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.id.groupId = pInfo->groupId;
|
||||
if (pInfo->ignoreGroupId) {
|
||||
pDataBlock->info.id.groupId = 0;
|
||||
} else {
|
||||
pDataBlock->info.id.groupId = pInfo->groupId;
|
||||
}
|
||||
pDataBlock->info.dataLoad = 1;
|
||||
}
|
||||
|
||||
|
@ -785,6 +790,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||
pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
|
||||
pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
|
||||
|
|
|
@ -455,6 +455,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
COPY_SCALAR_FIELD(numOfChannels);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(ignoreGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2027,6 +2027,7 @@ static const char* jkMergePhysiPlanTargets = "Targets";
|
|||
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
|
||||
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
|
||||
|
||||
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
|
||||
|
@ -2047,6 +2048,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2070,6 +2074,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanIgnoreGroupID, &pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2512,7 +2512,8 @@ enum {
|
|||
PHY_MERGE_CODE_TARGETS,
|
||||
PHY_MERGE_CODE_NUM_OF_CHANNELS,
|
||||
PHY_MERGE_CODE_SRC_GROUP_ID,
|
||||
PHY_MERGE_CODE_GROUP_SORT
|
||||
PHY_MERGE_CODE_GROUP_SORT,
|
||||
PHY_MERGE_CODE_IGNORE_GROUP_ID,
|
||||
};
|
||||
|
||||
static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2534,6 +2535,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_GROUP_SORT, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2563,6 +2567,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_MERGE_CODE_GROUP_SORT:
|
||||
code = tlvDecodeBool(pTlv, &pNode->groupSort);
|
||||
break;
|
||||
case PHY_MERGE_CODE_IGNORE_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
|
|||
|
||||
TSWAP(pProject->node.pLimit, pSelect->pLimit);
|
||||
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
|
||||
pProject->ignoreGroupId = (pSelect->isSubquery && NULL == pProject->node.pLimit && NULL == pProject->node.pSlimit) ? true : (NULL == pSelect->pPartitionByList);
|
||||
pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList);
|
||||
pProject->node.groupAction =
|
||||
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
|
||||
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||
|
|
|
@ -2351,17 +2351,6 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t mergeProjectionsLogicNode(SLogicNode* pDstNode, SLogicNode* pSrcNode) {
|
||||
SProjectLogicNode *pDstPro = (SProjectLogicNode*)pDstNode;
|
||||
SProjectLogicNode *pSrcPro = (SProjectLogicNode*)pSrcNode;
|
||||
|
||||
if (!pSrcPro->ignoreGroupId) {
|
||||
pDstPro->ignoreGroupId = pSrcPro->ignoreGroupId;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
|
||||
|
||||
|
@ -2371,11 +2360,8 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (1 == LIST_LENGTH(pChild->pChildren)) {
|
||||
code = mergeProjectionsLogicNode(pSelfNode, pChild);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
|
||||
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
|
||||
}
|
||||
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
|
||||
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
|
||||
} else { // no grand child
|
||||
NODES_CLEAR_LIST(pSelfNode->pChildren);
|
||||
}
|
||||
|
|
|
@ -1559,6 +1559,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
|||
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
|
||||
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
|
||||
pMerge->groupSort = pMergeLogicNode->groupSort;
|
||||
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
|
||||
|
||||
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
|
||||
|
||||
|
|
|
@ -532,6 +532,25 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT: {
|
||||
SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
|
||||
if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
|
||||
pMerge->ignoreGroupId = true;
|
||||
pLogicNode->ignoreGroupId = false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
|
||||
SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
|
||||
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
|
||||
|
@ -563,6 +582,9 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
((SLimitNode*)pSplitNode->pLimit)->limit += ((SLimitNode*)pSplitNode->pLimit)->offset;
|
||||
((SLimitNode*)pSplitNode->pLimit)->offset = 0;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplRewriteFromMergeNode(pMerge, pSplitNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pSubplan) {
|
||||
code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
|
||||
|
|
|
@ -134,7 +134,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
|
||||
|
|
|
@ -68,7 +68,7 @@ docker run \
|
|||
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
|
||||
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
||||
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
|
||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1"
|
||||
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||
|
||||
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
|
||||
|
@ -97,7 +97,7 @@ docker run \
|
|||
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
||||
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
||||
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 "
|
||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1 "
|
||||
|
||||
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
|
||||
|
||||
|
|
|
@ -36,4 +36,34 @@ if $rows != 0 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql insert into tb0 values (now, 0);
|
||||
sql insert into tb1 values (now, 1);
|
||||
sql insert into tb2 values (now, 2);
|
||||
sql insert into tb3 values (now, 3);
|
||||
sql insert into tb4 values (now, 4);
|
||||
sql insert into tb5 values (now, 5);
|
||||
sql insert into tb6 values (now, 6);
|
||||
sql insert into tb7 values (now, 7);
|
||||
|
||||
sql select * from (select 1 from $mt1 where ts is not null partition by tbname limit 1);
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select ts from $mt1 where ts is not null partition by tbname slimit 2);
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select ts from $mt1 where ts is not null partition by tbname limit 2);
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue