Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

This commit is contained in:
Hongze Cheng 2022-05-11 08:39:47 +00:00
commit 6fda914a8d
15 changed files with 778 additions and 745 deletions

View File

@ -47,36 +47,67 @@ def pre_test(){
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WK}
git checkout master
cd ${WKC}
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
sh '''
cd ${WK}
git checkout 2.0
cd ${WKC}
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
sh '''
cd ${WK}
git checkout 3.0
cd ${WKC}
git checkout 3.0
[ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../..
'''
} else {
sh '''
cd ${WK}
git checkout develop
cd ${WKC}
git checkout develop
'''
}
}
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
cd ${WKC}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
cd ${WK}
git pull >/dev/null
git log|head -n20
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WK}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git log|head -n20
cd ${WKC}
git pull >/dev/null
git log|head -n20
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
sh '''
cd ${WKC}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git submodule update --init --recursive
'''
sh '''
cd ${WKC}
cd ${WK}
export TZ=Asia/Harbin
date
rm -rf debug
@ -162,8 +193,8 @@ pipeline {
options { skipDefaultCheckout() }
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
WKC = '/var/lib/jenkins/workspace/TDinternal/community'
WKPY = '/var/lib/jenkins/workspace/taos-connector-python'
}
stages {
stage('run test') {
@ -177,15 +208,33 @@ pipeline {
steps {
timeout(time: 45, unit: 'MINUTES'){
pre_test()
sh '''
cd ${WKC}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
cd ${WK}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WK}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
cd ${WKC}/debug
ctest -VV
'''
sh '''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
}
sh '''
cd ${WKC}/tests
./test-all.sh b1fq

View File

@ -85,11 +85,11 @@ ELSE ()
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fpermissive -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3")
ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")

View File

@ -292,6 +292,109 @@ typedef struct {
SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
if (pSW == NULL) return pSW;
pSW->nCols = pSchemaWrapper->nCols;
pSW->sver = pSchemaWrapper->sver;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
taosMemoryFree(pSW);
return NULL;
}
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
return pSW;
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->flags);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeString(buf, pSchema->name);
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->flags);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name);
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pSW->nCols);
tlen += taosEncodeVariantI32(buf, pSW->sver);
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->sver);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
typedef struct {
@ -1874,7 +1977,7 @@ typedef struct SMqHbTopicInfo {
int32_t epoch;
int64_t topicUid;
char name[TSDB_TOPIC_FNAME_LEN];
SArray* pVgInfo;
SArray* pVgInfo; // SArray<SMqHbVgInfo>
} SMqHbTopicInfo;
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
@ -1993,49 +2096,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
return (void*)buf;
}
typedef struct {
int8_t reserved;
} SMqRebVgRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
int32_t epoch;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
char* sql;
char* physicalPlan;
char* qmsg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI32(buf, pReq->epoch);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->qmsg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI32(buf, &pReq->epoch);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->topicName);
buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeString(buf, &pReq->qmsg);
return buf;
}
typedef struct {
int32_t vgId;
int64_t offset;
@ -2057,109 +2117,6 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
if (pSW == NULL) return pSW;
pSW->nCols = pSchemaWrapper->nCols;
pSW->sver = pSchemaWrapper->sver;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
taosMemoryFree(pSW);
return NULL;
}
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
return pSW;
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI8(buf, pSchema->flags);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeString(buf, pSchema->name);
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->flags);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name);
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pSW->nCols);
tlen += taosEncodeVariantI32(buf, pSW->sver);
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->sver);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
@ -2428,6 +2385,21 @@ typedef struct {
SEpSet epSet;
} SMqSubVgEp;
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int8_t isSchemaAdaptive;
@ -2435,6 +2407,43 @@ typedef struct {
SSchemaWrapper schema;
} SMqSubTopicEp;
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
}
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
}
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs);
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;
@ -2513,58 +2522,6 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp>
} SMqAskEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
}
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
}
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
int32_t tlen = 0;
// tlen += taosEncodeString(buf, pRsp->cgroup);

View File

@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
int64_t walGetCommittedVer(SWal *);
#ifdef __cplusplus
}

View File

@ -138,14 +138,14 @@ typedef struct {
} SSmlHandle;
//=================================================================================================
static uint64_t linesSmlHandleId = 0;
static volatile int64_t linesSmlHandleId = 0;
static const char* TS = "_ts";
static const char* TAG = "_tagNone";
//=================================================================================================
static uint64_t smlGenId() {
uint64_t id;
static int64_t smlGenId() {
int64_t id;
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
@ -239,15 +239,13 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
bool tscDupColNames = (begin != NULL);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
}
taos_free_result(res);
// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
@ -265,15 +263,13 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
bool tscDupColNames = (begin != NULL);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
}
taos_free_result(res);
// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
@ -337,7 +333,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
SArray *cols = action->createSTable.fields;
for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
@ -350,7 +346,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
cols = action->createSTable.tags;
for(int i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
@ -390,7 +386,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0;
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
SSmlSTableMeta** tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, NULL);
while (tableMetaSml) {
SSmlSTableMeta* sTableData = *tableMetaSml;
@ -406,8 +402,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
SSchemaAction schemaAction = {.action = SCHEMA_ACTION_CREATE_STABLE};
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = sTableData->tags;
schemaAction.createSTable.fields = sTableData->cols;
@ -430,7 +425,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
}
sTableData->tableMeta = pTableMeta;
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
}
return 0;
}
@ -996,7 +991,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG;
kv->keyLen = strlen(TAG);
kv->value = TAG;
@ -1053,7 +1048,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
}
// add kv to SSmlKv
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
@ -1199,7 +1194,7 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra
if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// add ts to
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv){
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -1259,12 +1254,12 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra
static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
uint8_t *index = (uint8_t *)taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = taosArrayGet(tableMeta->tags, *index);
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->tags, *index);
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
*value = kv;
@ -1281,11 +1276,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
if(cols){
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
int16_t *index = (int16_t *)taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(index){
SSmlKv **value = taosArrayGet(tableMeta->cols, *index);
SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->cols, *index);
if(kv->type != (*value)->type){
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false;
@ -1311,7 +1306,7 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols,
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){
for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i);
taosArrayPush(tableMeta->tags, &kv);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
}
@ -1319,7 +1314,7 @@ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols)
if(cols){
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosArrayPush(tableMeta->cols, &kv);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
@ -1327,7 +1322,7 @@ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols)
}
static SSmlTableInfo* smlBuildTableInfo(bool format){
SSmlTableInfo *tag = taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if(!tag){
return NULL;
}
@ -1354,7 +1349,7 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){
return tag;
cleanup:
taosMemoryFreeClear(tag);
taosMemoryFree(tag);
return NULL;
}
@ -1364,18 +1359,17 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
}else{
tag->cols = taosArrayInit(16, POINTER_BYTES);
for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
SHashObj *kvHash = taosArrayGetP(tag->cols, i);
void** p1 = taosHashIterate(kvHash, NULL);
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
void** p1 = (void**)taosHashIterate(kvHash, NULL);
while (p1) {
SSmlKv* kv = *p1;
taosMemoryFreeClear(kv);
p1 = taosHashIterate(kvHash, p1);
taosMemoryFree(*p1);
p1 = (void**)taosHashIterate(kvHash, p1);
}
taosHashCleanup(kvHash);
}
}
taosArrayDestroy(tag->tags);
taosMemoryFreeClear(tag);
taosMemoryFree(tag);
}
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
@ -1390,7 +1384,7 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
}
taosArrayPush(oneTable->cols, &kvHash);
@ -1399,7 +1393,7 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
}
static SSmlSTableMeta* smlBuildSTableMeta(){
SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
SSmlSTableMeta* meta = (SSmlSTableMeta*)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if(!meta){
return NULL;
}
@ -1429,7 +1423,7 @@ static SSmlSTableMeta* smlBuildSTableMeta(){
return meta;
cleanup:
taosMemoryFreeClear(meta);
taosMemoryFree(meta);
return NULL;
}
@ -1475,9 +1469,9 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
return TSDB_CODE_SML_INVALID_DATA;
}
SSmlTableInfo **oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
if(oneTable){
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
ASSERT(tableMeta);
ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta cols
if(!ret){
@ -1516,7 +1510,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
buildChildTableName(&rName);
tinfo->uid = rName.uid;
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta
ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf);
if(!ret){
@ -1545,20 +1539,18 @@ static void smlDestroyInfo(SSmlHandle* info){
smlDestroyHandle(info->exec);
// destroy info->childTables
void** p1 = taosHashIterate(info->childTables, NULL);
void** p1 = (void**)taosHashIterate(info->childTables, NULL);
while (p1) {
SSmlTableInfo* oneTable = *p1;
smlDestroyBuildTableInfo(oneTable, info->dataFormat);
p1 = taosHashIterate(info->childTables, p1);
smlDestroyBuildTableInfo((SSmlTableInfo*)(*p1), info->dataFormat);
p1 = (void**)taosHashIterate(info->childTables, p1);
}
taosHashCleanup(info->childTables);
// destroy info->superTables
p1 = taosHashIterate(info->superTables, NULL);
p1 = (void**)taosHashIterate(info->superTables, NULL);
while (p1) {
SSmlSTableMeta* oneTable = *p1;
smlDestroySTableMeta(oneTable);
p1 = taosHashIterate(info->superTables, p1);
smlDestroySTableMeta((SSmlSTableMeta*)(*p1));
p1 = (void**)taosHashIterate(info->superTables, p1);
}
taosHashCleanup(info->superTables);
@ -1571,13 +1563,13 @@ static void smlDestroyInfo(SSmlHandle* info){
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle* info = taosMemoryCalloc(1, sizeof(SSmlHandle));
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
info->id = smlGenId();
info->pQuery = taosMemoryCalloc(1, sizeof(SQuery));
info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == info->pQuery) {
uError("SML:0x%"PRIx64" create info->pQuery error", info->id);
goto cleanup;
@ -1592,7 +1584,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
}
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = taos;
info->taos = (STscObj *)taos;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
@ -1634,7 +1626,7 @@ cleanup:
static int32_t smlInsertData(SSmlHandle* info) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo** oneTable = taosHashIterate(info->childTables, NULL);
SSmlTableInfo** oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, NULL);
while (oneTable) {
SSmlTableInfo* tableData = *oneTable;
@ -1650,7 +1642,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
SSmlSTableMeta** pMeta = taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
SSmlSTableMeta** pMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
ASSERT (NULL != pMeta && NULL != *pMeta);
// use tablemeta of stable to save vgid and uid of child table
@ -1662,7 +1654,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
if(code != TSDB_CODE_SUCCESS){
return code;
}
oneTable = taosHashIterate(info->childTables, oneTable);
oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable);
}
smlBuildOutput(info->exec, info->pVgHash);
@ -1748,12 +1740,12 @@ cleanup:
*/
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
if(!request){
return NULL;
}
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true);
SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision, true);
if(!info){
return (TAOS_RES*)request;
}

View File

@ -214,7 +214,7 @@ TEST(testCase, smlParseCols_tag_Test) {
msgBuf.len = 256;
SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL);
ASSERT_NE(cols, nullptr);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
const char *data =
@ -226,7 +226,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(size, 19);
// nchar
SSmlKv *kv = taosArrayGetP(cols, 0);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0);
ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
@ -235,7 +235,7 @@ TEST(testCase, smlParseCols_tag_Test) {
taosMemoryFree(kv);
// nchar
kv = taosArrayGetP(cols, 3);
kv = (SSmlKv *)taosArrayGetP(cols, 3);
ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
@ -257,7 +257,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ(size, 1);
// nchar
kv = taosArrayGetP(cols, 0);
kv = (SSmlKv *)taosArrayGetP(cols, 0);
ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0);
ASSERT_EQ(kv->keyLen, strlen(TAG));
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
@ -276,7 +276,7 @@ TEST(testCase, smlParseCols_Test) {
msgBuf.len = 256;
SArray *cols = taosArrayInit(16, POINTER_BYTES);
ASSERT_NE(cols, NULL);
ASSERT_NE(cols, nullptr);
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@ -288,7 +288,7 @@ TEST(testCase, smlParseCols_Test) {
ASSERT_EQ(size, 19);
// binary
SSmlKv *kv = taosArrayGetP(cols, 0);
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0);
ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BINARY);
@ -297,7 +297,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// nchar
kv = taosArrayGetP(cols, 1);
kv = (SSmlKv *)taosArrayGetP(cols, 1);
ASSERT_EQ(strncasecmp(kv->key, "cnch", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
@ -306,7 +306,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// bool
kv = taosArrayGetP(cols, 2);
kv = (SSmlKv *)taosArrayGetP(cols, 2);
ASSERT_EQ(strncasecmp(kv->key, "cbool", 5), 0);
ASSERT_EQ(kv->keyLen, 5);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL);
@ -315,7 +315,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// double
kv = taosArrayGetP(cols, 3);
kv = (SSmlKv *)taosArrayGetP(cols, 3);
ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE);
@ -325,7 +325,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// float
kv = taosArrayGetP(cols, 4);
kv = (SSmlKv *)taosArrayGetP(cols, 4);
ASSERT_EQ(strncasecmp(kv->key, "cf32_", 5), 0);
ASSERT_EQ(kv->keyLen, 5);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT);
@ -335,7 +335,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// float
kv = taosArrayGetP(cols, 5);
kv = (SSmlKv *)taosArrayGetP(cols, 5);
ASSERT_EQ(strncasecmp(kv->key, "cf32", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT);
@ -345,7 +345,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// tiny int
kv = taosArrayGetP(cols, 6);
kv = (SSmlKv *)taosArrayGetP(cols, 6);
ASSERT_EQ(strncasecmp(kv->key, "ci8", 3), 0);
ASSERT_EQ(kv->keyLen, 3);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_TINYINT);
@ -354,7 +354,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// unsigned tiny int
kv = taosArrayGetP(cols, 7);
kv = (SSmlKv *)taosArrayGetP(cols, 7);
ASSERT_EQ(strncasecmp(kv->key, "cu8", 3), 0);
ASSERT_EQ(kv->keyLen, 3);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UTINYINT);
@ -363,7 +363,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// small int
kv = taosArrayGetP(cols, 8);
kv = (SSmlKv *)taosArrayGetP(cols, 8);
ASSERT_EQ(strncasecmp(kv->key, "ci16", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_SMALLINT);
@ -372,7 +372,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// unsigned smallint
kv = taosArrayGetP(cols, 9);
kv = (SSmlKv *)taosArrayGetP(cols, 9);
ASSERT_EQ(strncasecmp(kv->key, "cu16", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_USMALLINT);
@ -381,7 +381,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// int
kv = taosArrayGetP(cols, 10);
kv = (SSmlKv *)taosArrayGetP(cols, 10);
ASSERT_EQ(strncasecmp(kv->key, "ci32", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_INT);
@ -390,7 +390,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// unsigned int
kv = taosArrayGetP(cols, 11);
kv = (SSmlKv *)taosArrayGetP(cols, 11);
ASSERT_EQ(strncasecmp(kv->key, "cu32", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UINT);
@ -400,7 +400,7 @@ TEST(testCase, smlParseCols_Test) {
// bigint
kv = taosArrayGetP(cols, 12);
kv = (SSmlKv *)taosArrayGetP(cols, 12);
ASSERT_EQ(strncasecmp(kv->key, "ci64", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BIGINT);
@ -409,7 +409,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// bigint
kv = taosArrayGetP(cols, 13);
kv = (SSmlKv *)taosArrayGetP(cols, 13);
ASSERT_EQ(strncasecmp(kv->key, "ci", 2), 0);
ASSERT_EQ(kv->keyLen, 2);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BIGINT);
@ -418,7 +418,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// unsigned bigint
kv = taosArrayGetP(cols, 14);
kv = (SSmlKv *)taosArrayGetP(cols, 14);
ASSERT_EQ(strncasecmp(kv->key, "cu64", 4), 0);
ASSERT_EQ(kv->keyLen, 4);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UBIGINT);
@ -427,7 +427,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// bool
kv = taosArrayGetP(cols, 15);
kv = (SSmlKv *)taosArrayGetP(cols, 15);
ASSERT_EQ(strncasecmp(kv->key, "cbooltrue", 9), 0);
ASSERT_EQ(kv->keyLen, 9);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL);
@ -437,7 +437,7 @@ TEST(testCase, smlParseCols_Test) {
// bool
kv = taosArrayGetP(cols, 16);
kv = (SSmlKv *)taosArrayGetP(cols, 16);
ASSERT_EQ(strncasecmp(kv->key, "cboolt", 6), 0);
ASSERT_EQ(kv->keyLen, 6);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL);
@ -446,7 +446,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// bool
kv = taosArrayGetP(cols, 17);
kv = (SSmlKv *)taosArrayGetP(cols, 17);
ASSERT_EQ(strncasecmp(kv->key, "cboolf", 6), 0);
ASSERT_EQ(kv->keyLen, 6);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL);
@ -455,7 +455,7 @@ TEST(testCase, smlParseCols_Test) {
taosMemoryFree(kv);
// nchar
kv = taosArrayGetP(cols, 18);
kv = (SSmlKv *)taosArrayGetP(cols, 18);
ASSERT_EQ(strncasecmp(kv->key, "cnch_", 5), 0);
ASSERT_EQ(kv->keyLen, 5);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
@ -469,7 +469,7 @@ TEST(testCase, smlParseCols_Test) {
TEST(testCase, smlParseLine_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, NULL);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
@ -477,11 +477,11 @@ TEST(testCase, smlParseLine_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, NULL);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL);
ASSERT_NE(info, nullptr);
const char *sql[9] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000",
@ -494,7 +494,7 @@ TEST(testCase, smlParseLine_Test) {
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
};
smlInsertLines(info, sql, 9);
smlInsertLines(info, (char**)sql, 9);
// for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]);
// }
@ -502,7 +502,7 @@ TEST(testCase, smlParseLine_Test) {
TEST(testCase, smlParseLine_error_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, NULL);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
@ -510,17 +510,17 @@ TEST(testCase, smlParseLine_error_Test) {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, NULL);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true);
ASSERT_NE(info, NULL);
ASSERT_NE(info, nullptr);
const char *sql[2] = {
"measure,t1=3 c1=8",
"measure,t2=3 c1=8u8"
};
int ret = smlInsertLines(info, sql, 2);
int ret = smlInsertLines(info, (char **)sql, 2);
ASSERT_NE(ret, 0);
}

View File

@ -452,6 +452,7 @@ typedef struct {
int8_t withSchema;
int8_t withTag;
SRWLatch lock;
int32_t consumerCnt;
int32_t sqlLen;
int32_t astLen;
char* sql;

View File

@ -787,6 +787,8 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
}
}
// do not show for cleared subscription
#if 0
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
@ -829,6 +831,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
numOfRows++;
}
#endif
taosRUnLockLatch(&pSub->lock);
sdbRelease(pSdb, pSub);
}

View File

@ -89,6 +89,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
@ -152,6 +154,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) {

View File

@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
fetchOffset = walGetCommittedVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}

View File

@ -60,7 +60,9 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
//assert(code == 0);
@ -79,7 +81,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
//assert(walReadWithHandle(pWalHandle, index) == 0);
@ -113,7 +117,9 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
return 0; // to avoid compiler error
@ -144,7 +150,9 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
if (code != 0) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno));
int32_t linuxErr = errno;
const char *linuxErrMsg = strerror(errno);
sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
return 0; // to avoid compiler error

View File

@ -25,6 +25,8 @@ int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotV
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}

View File

@ -143,7 +143,11 @@ void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capa
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
int32_t code;
// TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) {
return -1;
}
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);

View File

@ -172,106 +172,108 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
-timezone / 3600);
#else
if (taosCheckExistFile("/etc/timezone")) {
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1);
/* load time zone string from /etc/timezone */
// FILE *f = fopen("/etc/timezone", "r");
errno = 0;
TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ);
char buf[68] = {0};
if (pFile != NULL) {
int len = taosReadFile(pFile, buf, 64);
if (len < 64 && taosGetErrorFile(pFile)) {
taosCloseFile(&pFile);
printf("read /etc/timezone error, reason:%s", strerror(errno));
return;
}
taosCloseFile(&pFile);
buf[sizeof(buf) - 1] = 0;
char *lineEnd = strstr(buf, "\n");
if (lineEnd != NULL) {
*lineEnd = 0;
}
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if (strlen(buf) > 0) {
setenv("TZ", buf, 1);
}
}
// get and set default timezone
tzset();
/*
* get CURRENT time zone.
* system current time zone is affected by daylight saving time(DST)
*
* e.g., the local time zone of London in DST is GMT+01:00,
* otherwise is GMT+00:00
*/
int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR;
*tsTimezone = tz;
tz += daylight;
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
} else {
char buf[4096] = {0};
char *tz = NULL;
{
int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
return;
}
buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') {
if (tz) {
tz = buf + i + 1;
break;
char buf[4096] = {0};
char *tz = NULL;
{
int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
if (taosCheckExistFile("/etc/timezone")) {
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1);
/* load time zone string from /etc/timezone */
// FILE *f = fopen("/etc/timezone", "r");
errno = 0;
TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ);
char buf[68] = {0};
if (pFile != NULL) {
int len = taosReadFile(pFile, buf, 64);
if (len < 64 && taosGetErrorFile(pFile)) {
taosCloseFile(&pFile);
printf("read /etc/timezone error, reason:%s", strerror(errno));
return;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed");
return;
}
setenv("TZ", tz, 1);
tzset();
taosCloseFile(&pFile);
buf[sizeof(buf) - 1] = 0;
char *lineEnd = strstr(buf, "\n");
if (lineEnd != NULL) {
*lineEnd = 0;
}
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if (strlen(buf) > 0) {
setenv("TZ", buf, 1);
}
}
// get and set default timezone
tzset();
/*
* get CURRENT time zone.
* system current time zone is affected by daylight saving time(DST)
*
* e.g., the local time zone of London in DST is GMT+01:00,
* otherwise is GMT+00:00
*/
int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR;
*tsTimezone = tz;
tz += daylight;
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
} else {
printf("There is not /etc/timezone.\n");
}
return;
}
buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') {
if (tz) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed");
return;
}
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1);
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0],
-timezone / 3600);
setenv("TZ", tz, 1);
tzset();
}
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1);
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0],
-timezone / 3600);
#endif
}

View File

@ -1,337 +1,347 @@
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 2
#=============================== start consume =============================#
print ================ test consume from stb
print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# $data[0][2]/$data[1][2] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
if $data[0][2] < $totalMsgOfOneTopic then
return -1
endi
if $data[0][2] > $totalMsgOfStb then
return -1
endi
if $data[1][2] < $totalMsgOfOneTopic then
return -1
endi
if $data[1][2] > $totalMsgOfStb then
return -1
endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfMsgCnt = $data[0][2] + $data[1][2]
if $sumOfMsgCnt != $totalMsgCons then
return -1
endi
# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
if $data[0][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[0][3] > $totalMsgOfStb then
return -1
endi
if $data[1][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[1][3] > $totalMsgOfStb then
return -1
endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfRows = $data[0][3] + $data[1][3]
if $sumOfRows != $totalMsgCons then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_ctb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfCtb
# or $data[0][2] $totalMsgOfCtb and $data[1][2] == $totalMsgOfOneTopic
if $data[0][2] == $totalMsgOfOneTopic then
if $data[1][2] == $totalMsgOfCtb then
goto check_ok_0
endi
elif $data[1][2] == $totalMsgOfOneTopic then
if $data[0][2] == $totalMsgOfCtb then
goto check_ok_0
endi
endi
return -1
check_ok_0:
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfCtb then
goto check_ok_1
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfCtb then
goto check_ok_1
endi
endi
return -1
check_ok_1:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_ntb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfNtb
# or $data[0][2] $totalMsgOfNtb and $data[1][2] == $totalMsgOfOneTopic
if $data[0][2] == $totalMsgOfOneTopic then
if $data[1][2] == $totalMsgOfNtb then
goto check_ok_2
endi
elif $data[1][2] == $totalMsgOfOneTopic then
if $data[0][2] == $totalMsgOfNtb then
goto check_ok_2
endi
endi
return -1
check_ok_2:
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfNtb then
goto check_ok_3
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfNtb then
goto check_ok_3
endi
endi
return -1
check_ok_3:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 2
#=============================== start consume =============================#
print ================ test consume from stb
print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# $data[0][2]/$data[1][2] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
if $data[0][2] < $totalMsgOfOneTopic then
return -1
endi
if $data[0][2] > $totalMsgOfStb then
return -1
endi
if $data[1][2] < $totalMsgOfOneTopic then
return -1
endi
if $data[1][2] > $totalMsgOfStb then
return -1
endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfMsgCnt = $data[0][2] + $data[1][2]
if $sumOfMsgCnt != $totalMsgCons then
return -1
endi
# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
if $data[0][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[0][3] > $totalMsgOfStb then
return -1
endi
if $data[1][3] < $totalMsgOfOneTopic then
return -1
endi
if $data[1][3] > $totalMsgOfStb then
return -1
endi
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
$sumOfRows = $data[0][3] + $data[1][3]
if $sumOfRows != $totalMsgCons then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_ctb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfCtb
# or $data[0][2] $totalMsgOfCtb and $data[1][2] == $totalMsgOfOneTopic
if $data[0][2] == $totalMsgOfOneTopic then
if $data[1][2] == $totalMsgOfCtb then
goto check_ok_0
endi
elif $data[1][2] == $totalMsgOfOneTopic then
if $data[0][2] == $totalMsgOfCtb then
goto check_ok_0
endi
endi
return -1
check_ok_0:
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfCtb then
goto check_ok_1
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfCtb then
goto check_ok_1
endi
endi
return -1
check_ok_1:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$topicList = ' . topic_ntb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfNtb
# or $data[0][2] $totalMsgOfNtb and $data[1][2] == $totalMsgOfOneTopic
if $data[0][2] == $totalMsgOfOneTopic then
if $data[1][2] == $totalMsgOfNtb then
goto check_ok_2
endi
elif $data[1][2] == $totalMsgOfOneTopic then
if $data[0][2] == $totalMsgOfNtb then
goto check_ok_2
endi
endi
return -1
check_ok_2:
if $data[0][3] == $totalMsgOfOneTopic then
if $data[1][3] == $totalMsgOfNtb then
goto check_ok_3
endi
elif $data[1][3] == $totalMsgOfOneTopic then
if $data[0][3] == $totalMsgOfNtb then
goto check_ok_3
endi
endi
return -1
check_ok_3:
sql select * from performance_schema.`consumers`
if $rows != 0 then
return -1
endi
#sql select * from performance_schema.`subscriptions`
#if $rows != 0 then
# return -1
#endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT