diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d827c6640a..d90bc3061b 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -47,36 +47,67 @@ def pre_test(){ script { if (env.CHANGE_TARGET == 'master') { sh ''' + cd ${WK} + git checkout master cd ${WKC} git checkout master ''' } else if(env.CHANGE_TARGET == '2.0') { sh ''' + cd ${WK} + git checkout 2.0 cd ${WKC} git checkout 2.0 ''' } else if(env.CHANGE_TARGET == '3.0') { sh ''' + cd ${WK} + git checkout 3.0 cd ${WKC} git checkout 3.0 - [ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. ''' } else { sh ''' + cd ${WK} + git checkout develop cd ${WKC} git checkout develop ''' } } + if (env.CHANGE_URL =~ /\/TDengine\//) { + sh ''' + cd ${WKC} + git pull >/dev/null + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log|head -n20 + cd ${WK} + git pull >/dev/null + git log|head -n20 + ''' + } else if (env.CHANGE_URL =~ /\/TDinternal\//) { + sh ''' + cd ${WK} + git pull >/dev/null + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log|head -n20 + cd ${WKC} + git pull >/dev/null + git log|head -n20 + ''' + } else { + sh ''' + echo "unmatched reposiotry ${CHANGE_URL}" + ''' + } sh ''' cd ${WKC} - git pull >/dev/null - git fetch origin +refs/pull/${CHANGE_ID}/merge - git checkout -qf FETCH_HEAD git submodule update --init --recursive ''' sh ''' - cd ${WKC} + cd ${WK} export TZ=Asia/Harbin date rm -rf debug @@ -162,8 +193,8 @@ pipeline { options { skipDefaultCheckout() } environment{ WK = '/var/lib/jenkins/workspace/TDinternal' - WKC= '/var/lib/jenkins/workspace/TDengine' - WKPY= '/var/lib/jenkins/workspace/taos-connector-python' + WKC = '/var/lib/jenkins/workspace/TDinternal/community' + WKPY = '/var/lib/jenkins/workspace/taos-connector-python' } stages { stage('run test') { @@ -177,15 +208,33 @@ pipeline { steps { timeout(time: 45, unit: 'MINUTES'){ pre_test() - sh ''' - cd ${WKC}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' + script { + if (env.CHANGE_URL =~ /\/TDengine\//) { + sh ''' + cd ${WK}/debug + ctest -VV + ''' + sh ''' + export LD_LIBRARY_PATH=${WK}/debug/build/lib + cd ${WKC}/tests/system-test + ./fulltest.sh + ''' + } else if (env.CHANGE_URL =~ /\/TDinternal\//) { + sh ''' + cd ${WKC}/debug + ctest -VV + ''' + sh ''' + export LD_LIBRARY_PATH=${WKC}/debug/build/lib + cd ${WKC}/tests/system-test + ./fulltest.sh + ''' + } else { + sh ''' + echo "unmatched reposiotry ${CHANGE_URL}" + ''' + } + } sh ''' cd ${WKC}/tests ./test-all.sh b1fq diff --git a/cmake/cmake.define b/cmake/cmake.define index f90024e032..0eb5206aab 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -85,11 +85,11 @@ ELSE () IF (${SANITIZER} MATCHES "true") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") MESSAGE(STATUS "Will compile with Address Sanitizer!") ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fpermissive -fPIC -gdwarf-2 -g3") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3") ENDIF () MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}") diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 26bfa997f7..d946fe6a98 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -292,6 +292,109 @@ typedef struct { SSchema* pSchema; } SSchemaWrapper; +static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { + SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); + if (pSW == NULL) return pSW; + pSW->nCols = pSchemaWrapper->nCols; + pSW->sver = pSchemaWrapper->sver; + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + taosMemoryFree(pSW); + return NULL; + } + memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); + return pSW; +} + +static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { + taosMemoryFree(pSchemaWrapper->pSchema); + taosMemoryFree(pSchemaWrapper); +} + +static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI8(buf, pSchema->flags); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeFixedI16(buf, pSchema->colId); + tlen += taosEncodeString(buf, pSchema->name); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI8(buf, &pSchema->flags); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeFixedI16(buf, &pSchema->colId); + buf = taosDecodeStringTo(buf, pSchema->name); + return (void*)buf; +} + +static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) { + if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; + if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; + if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; + if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1; + if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; + return 0; +} + +static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) { + if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; + if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; + return 0; +} + +static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { + int32_t tlen = 0; + tlen += taosEncodeVariantI32(buf, pSW->nCols); + tlen += taosEncodeVariantI32(buf, pSW->sver); + for (int32_t i = 0; i < pSW->nCols; i++) { + tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { + buf = taosDecodeVariantI32(buf, &pSW->nCols); + buf = taosDecodeVariantI32(buf, &pSW->sver); + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } + + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + return (void*)buf; +} + +static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { + if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; + } + + return 0; +} + +static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { + if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; + } + + return 0; +} + STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); typedef struct { @@ -1874,7 +1977,7 @@ typedef struct SMqHbTopicInfo { int32_t epoch; int64_t topicUid; char name[TSDB_TOPIC_FNAME_LEN]; - SArray* pVgInfo; + SArray* pVgInfo; // SArray } SMqHbTopicInfo; static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { @@ -1993,49 +2096,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) return (void*)buf; } -typedef struct { - int8_t reserved; -} SMqRebVgRsp; - -typedef struct { - int64_t leftForVer; - int32_t vgId; - int32_t epoch; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - char* sql; - char* physicalPlan; - char* qmsg; -} SMqSetCVgReq; - -static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->leftForVer); - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI32(buf, pReq->epoch); - tlen += taosEncodeFixedI64(buf, pReq->consumerId); - tlen += taosEncodeString(buf, pReq->topicName); - tlen += taosEncodeString(buf, pReq->cgroup); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, pReq->qmsg); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->leftForVer); - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI32(buf, &pReq->epoch); - buf = taosDecodeFixedI64(buf, &pReq->consumerId); - buf = taosDecodeStringTo(buf, pReq->topicName); - buf = taosDecodeStringTo(buf, pReq->cgroup); - buf = taosDecodeString(buf, &pReq->sql); - buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = taosDecodeString(buf, &pReq->qmsg); - return buf; -} - typedef struct { int32_t vgId; int64_t offset; @@ -2057,109 +2117,6 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); -static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { - SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); - if (pSW == NULL) return pSW; - pSW->nCols = pSchemaWrapper->nCols; - pSW->sver = pSchemaWrapper->sver; - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - taosMemoryFree(pSW); - return NULL; - } - memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); - return pSW; -} - -static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { - taosMemoryFree(pSchemaWrapper->pSchema); - taosMemoryFree(pSchemaWrapper); -} - -static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { - int32_t tlen = 0; - tlen += taosEncodeFixedI8(buf, pSchema->type); - tlen += taosEncodeFixedI8(buf, pSchema->flags); - tlen += taosEncodeFixedI32(buf, pSchema->bytes); - tlen += taosEncodeFixedI16(buf, pSchema->colId); - tlen += taosEncodeString(buf, pSchema->name); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { - buf = taosDecodeFixedI8(buf, &pSchema->type); - buf = taosDecodeFixedI8(buf, &pSchema->flags); - buf = taosDecodeFixedI32(buf, &pSchema->bytes); - buf = taosDecodeFixedI16(buf, &pSchema->colId); - buf = taosDecodeStringTo(buf, pSchema->name); - return (void*)buf; -} - -static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) { - if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; - if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; - if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; - if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1; - if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; - return 0; -} - -static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) { - if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; - if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; - if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; - return 0; -} - -static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { - int32_t tlen = 0; - tlen += taosEncodeVariantI32(buf, pSW->nCols); - tlen += taosEncodeVariantI32(buf, pSW->sver); - for (int32_t i = 0; i < pSW->nCols; i++) { - tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { - buf = taosDecodeVariantI32(buf, &pSW->nCols); - buf = taosDecodeVariantI32(buf, &pSW->sver); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } - - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); - } - return (void*)buf; -} - -static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { - if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; - if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; - for (int32_t i = 0; i < pSW->nCols; i++) { - if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; - } - - return 0; -} - -static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { - if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) return -1; - for (int32_t i = 0; i < pSW->nCols; i++) { - if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; - } - - return 0; -} - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; @@ -2428,6 +2385,21 @@ typedef struct { SEpSet epSet; } SMqSubVgEp; +static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgEp->vgId); + tlen += taosEncodeFixedI64(buf, pVgEp->offset); + tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { + buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeFixedI64(buf, &pVgEp->offset); + buf = taosDecodeSEpSet(buf, &pVgEp->epSet); + return buf; +} + typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int8_t isSchemaAdaptive; @@ -2435,6 +2407,43 @@ typedef struct { SSchemaWrapper schema; } SMqSubTopicEp; +static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pTopicEp->topic); + tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); + int32_t sz = taosArrayGetSize(pTopicEp->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); + tlen += tEncodeSMqSubVgEp(buf, pVgEp); + } + tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { + buf = taosDecodeStringTo(buf, pTopicEp->topic); + buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); + if (pTopicEp->vgs == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp vgEp; + buf = tDecodeSMqSubVgEp(buf, &vgEp); + taosArrayPush(pTopicEp->vgs, &vgEp); + } + buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); + return buf; +} + +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { + // taosMemoryFree(pSubTopicEp->schema.pSchema); + taosArrayDestroy(pSubTopicEp->vgs); +} + typedef struct { SMqRspHead head; int64_t reqOffset; @@ -2513,58 +2522,6 @@ typedef struct { SArray* topics; // SArray } SMqAskEpRsp; -static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - // taosMemoryFree(pSubTopicEp->schema.pSchema); - taosArrayDestroy(pSubTopicEp->vgs); -} - -static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeFixedI64(buf, pVgEp->offset); - tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { - buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeFixedI64(buf, &pVgEp->offset); - buf = taosDecodeSEpSet(buf, &pVgEp->epSet); - return buf; -} - -static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pTopicEp->topic); - tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); - int32_t sz = taosArrayGetSize(pTopicEp->vgs); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); - tlen += tEncodeSMqSubVgEp(buf, pVgEp); - } - tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { - buf = taosDecodeStringTo(buf, pTopicEp->topic); - buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); - if (pTopicEp->vgs == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp vgEp; - buf = tDecodeSMqSubVgEp(buf, &vgEp); - taosArrayPush(pTopicEp->vgs, &vgEp); - } - buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); - return buf; -} - static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) { int32_t tlen = 0; // tlen += taosEncodeString(buf, pRsp->cgroup); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 93a950466b..e541c214de 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); +int64_t walGetCommittedVer(SWal *); #ifdef __cplusplus } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 59a7cbee1a..c82f54f32b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -138,14 +138,14 @@ typedef struct { } SSmlHandle; //================================================================================================= -static uint64_t linesSmlHandleId = 0; +static volatile int64_t linesSmlHandleId = 0; static const char* TS = "_ts"; static const char* TAG = "_tagNone"; //================================================================================================= -static uint64_t smlGenId() { - uint64_t id; +static int64_t smlGenId() { + int64_t id; do { id = atomic_add_fetch_64(&linesSmlHandleId, 1); @@ -239,15 +239,13 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); const char* errStr = taos_errstr(res); - char* begin = strstr(errStr, "duplicated column names"); - bool tscDupColNames = (begin != NULL); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr); } taos_free_result(res); // if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { + if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) { TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { @@ -265,15 +263,13 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); const char* errStr = taos_errstr(res); - char* begin = strstr(errStr, "duplicated column names"); - bool tscDupColNames = (begin != NULL); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); } taos_free_result(res); // if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) { - if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { + if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) { TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { @@ -337,7 +333,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { SArray *cols = action->createSTable.fields; for(int i = 0; i < taosArrayGetSize(cols); i++){ - SSmlKv *kv = taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); smlBuildColumnDescription(kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; @@ -350,7 +346,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { cols = action->createSTable.tags; for(int i = 0; i < taosArrayGetSize(cols); i++){ - SSmlKv *kv = taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); smlBuildColumnDescription(kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; @@ -390,7 +386,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { static int32_t smlModifyDBSchemas(SSmlHandle* info) { int32_t code = 0; - SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL); + SSmlSTableMeta** tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, NULL); while (tableMetaSml) { SSmlSTableMeta* sTableData = *tableMetaSml; @@ -406,8 +402,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) { - SSchemaAction schemaAction = {0}; - schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; + SSchemaAction schemaAction = {.action = SCHEMA_ACTION_CREATE_STABLE}; memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); schemaAction.createSTable.tags = sTableData->tags; schemaAction.createSTable.fields = sTableData->cols; @@ -430,7 +425,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { } sTableData->tableMeta = pTableMeta; - tableMetaSml = taosHashIterate(info->superTables, tableMetaSml); + tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); } return 0; } @@ -996,7 +991,7 @@ static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBu static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ if(isTag && len == 0){ - SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); + SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); kv->key = TAG; kv->keyLen = strlen(TAG); kv->value = TAG; @@ -1053,7 +1048,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is } // add kv to SSmlKv - SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); + SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); kv->key = key; kv->keyLen = keyLen; kv->value = value; @@ -1199,7 +1194,7 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP; // add ts to - SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); + SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv){ return TSDB_CODE_OUT_OF_MEMORY; } @@ -1259,12 +1254,12 @@ static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArra static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){ if(tags){ for (int i = 0; i < taosArrayGetSize(tags); ++i) { - SSmlKv *kv = taosArrayGetP(tags, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i); ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR); - uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); + uint8_t *index = (uint8_t *)taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); if(index){ - SSmlKv **value = taosArrayGet(tableMeta->tags, *index); + SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->tags, *index); ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR); if(kv->valueLen > (*value)->valueLen){ // tags type is nchar *value = kv; @@ -1281,11 +1276,11 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, if(cols){ for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp - SSmlKv *kv = taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); - int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); + int16_t *index = (int16_t *)taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); if(index){ - SSmlKv **value = taosArrayGet(tableMeta->cols, *index); + SSmlKv **value = (SSmlKv **)taosArrayGet(tableMeta->cols, *index); if(kv->type != (*value)->type){ smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); return false; @@ -1311,7 +1306,7 @@ static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ if(tags){ for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) { - SSmlKv *kv = taosArrayGetP(tags, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(tags, i); taosArrayPush(tableMeta->tags, &kv); taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES); } @@ -1319,7 +1314,7 @@ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols) if(cols){ for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { - SSmlKv *kv = taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); taosArrayPush(tableMeta->cols, &kv); taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES); } @@ -1327,7 +1322,7 @@ static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols) } static SSmlTableInfo* smlBuildTableInfo(bool format){ - SSmlTableInfo *tag = taosMemoryCalloc(sizeof(SSmlTableInfo), 1); + SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1); if(!tag){ return NULL; } @@ -1354,7 +1349,7 @@ static SSmlTableInfo* smlBuildTableInfo(bool format){ return tag; cleanup: - taosMemoryFreeClear(tag); + taosMemoryFree(tag); return NULL; } @@ -1364,18 +1359,17 @@ static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){ }else{ tag->cols = taosArrayInit(16, POINTER_BYTES); for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){ - SHashObj *kvHash = taosArrayGetP(tag->cols, i); - void** p1 = taosHashIterate(kvHash, NULL); + SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); + void** p1 = (void**)taosHashIterate(kvHash, NULL); while (p1) { - SSmlKv* kv = *p1; - taosMemoryFreeClear(kv); - p1 = taosHashIterate(kvHash, p1); + taosMemoryFree(*p1); + p1 = (void**)taosHashIterate(kvHash, p1); } taosHashCleanup(kvHash); } } taosArrayDestroy(tag->tags); - taosMemoryFreeClear(tag); + taosMemoryFree(tag); } static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ @@ -1390,7 +1384,7 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col return TSDB_CODE_TSC_OUT_OF_MEMORY; } for(size_t i = 0; i < taosArrayGetSize(cols); i++){ - SSmlKv *kv = taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later } taosArrayPush(oneTable->cols, &kvHash); @@ -1399,7 +1393,7 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col } static SSmlSTableMeta* smlBuildSTableMeta(){ - SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); + SSmlSTableMeta* meta = (SSmlSTableMeta*)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); if(!meta){ return NULL; } @@ -1429,7 +1423,7 @@ static SSmlSTableMeta* smlBuildSTableMeta(){ return meta; cleanup: - taosMemoryFreeClear(meta); + taosMemoryFree(meta); return NULL; } @@ -1475,9 +1469,9 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { return TSDB_CODE_SML_INVALID_DATA; } - SSmlTableInfo **oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); + SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); if(oneTable){ - SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); + SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); ASSERT(tableMeta); ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta cols if(!ret){ @@ -1516,7 +1510,7 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) { buildChildTableName(&rName); tinfo->uid = rName.uid; - SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); + SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf); if(!ret){ @@ -1545,20 +1539,18 @@ static void smlDestroyInfo(SSmlHandle* info){ smlDestroyHandle(info->exec); // destroy info->childTables - void** p1 = taosHashIterate(info->childTables, NULL); + void** p1 = (void**)taosHashIterate(info->childTables, NULL); while (p1) { - SSmlTableInfo* oneTable = *p1; - smlDestroyBuildTableInfo(oneTable, info->dataFormat); - p1 = taosHashIterate(info->childTables, p1); + smlDestroyBuildTableInfo((SSmlTableInfo*)(*p1), info->dataFormat); + p1 = (void**)taosHashIterate(info->childTables, p1); } taosHashCleanup(info->childTables); // destroy info->superTables - p1 = taosHashIterate(info->superTables, NULL); + p1 = (void**)taosHashIterate(info->superTables, NULL); while (p1) { - SSmlSTableMeta* oneTable = *p1; - smlDestroySTableMeta(oneTable); - p1 = taosHashIterate(info->superTables, p1); + smlDestroySTableMeta((SSmlSTableMeta*)(*p1)); + p1 = (void**)taosHashIterate(info->superTables, p1); } taosHashCleanup(info->superTables); @@ -1571,13 +1563,13 @@ static void smlDestroyInfo(SSmlHandle* info){ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ int32_t code = TSDB_CODE_SUCCESS; - SSmlHandle* info = taosMemoryCalloc(1, sizeof(SSmlHandle)); + SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { return NULL; } info->id = smlGenId(); - info->pQuery = taosMemoryCalloc(1, sizeof(SQuery)); + info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery)); if (NULL == info->pQuery) { uError("SML:0x%"PRIx64" create info->pQuery error", info->id); goto cleanup; @@ -1592,7 +1584,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol } ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; - info->taos = taos; + info->taos = (STscObj *)taos; code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); if(code != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" get catalog error %d", info->id, code); @@ -1634,7 +1626,7 @@ cleanup: static int32_t smlInsertData(SSmlHandle* info) { int32_t code = TSDB_CODE_SUCCESS; - SSmlTableInfo** oneTable = taosHashIterate(info->childTables, NULL); + SSmlTableInfo** oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, NULL); while (oneTable) { SSmlTableInfo* tableData = *oneTable; @@ -1650,7 +1642,7 @@ static int32_t smlInsertData(SSmlHandle* info) { } taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); - SSmlSTableMeta** pMeta = taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); + SSmlSTableMeta** pMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); ASSERT (NULL != pMeta && NULL != *pMeta); // use tablemeta of stable to save vgid and uid of child table @@ -1662,7 +1654,7 @@ static int32_t smlInsertData(SSmlHandle* info) { if(code != TSDB_CODE_SUCCESS){ return code; } - oneTable = taosHashIterate(info->childTables, oneTable); + oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable); } smlBuildOutput(info->exec, info->pVgHash); @@ -1748,12 +1740,12 @@ cleanup: */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); if(!request){ return NULL; } - SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true); + SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision, true); if(!info){ return (TAOS_RES*)request; } diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 6a4823b855..89a573e4ee 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -214,7 +214,7 @@ TEST(testCase, smlParseCols_tag_Test) { msgBuf.len = 256; SArray *cols = taosArrayInit(16, POINTER_BYTES); - ASSERT_NE(cols, NULL); + ASSERT_NE(cols, nullptr); SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); const char *data = @@ -226,7 +226,7 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(size, 19); // nchar - SSmlKv *kv = taosArrayGetP(cols, 0); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0); ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); @@ -235,7 +235,7 @@ TEST(testCase, smlParseCols_tag_Test) { taosMemoryFree(kv); // nchar - kv = taosArrayGetP(cols, 3); + kv = (SSmlKv *)taosArrayGetP(cols, 3); ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); @@ -257,7 +257,7 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(size, 1); // nchar - kv = taosArrayGetP(cols, 0); + kv = (SSmlKv *)taosArrayGetP(cols, 0); ASSERT_EQ(strncasecmp(kv->key, TAG, strlen(TAG)), 0); ASSERT_EQ(kv->keyLen, strlen(TAG)); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); @@ -276,7 +276,7 @@ TEST(testCase, smlParseCols_Test) { msgBuf.len = 256; SArray *cols = taosArrayInit(16, POINTER_BYTES); - ASSERT_NE(cols, NULL); + ASSERT_NE(cols, nullptr); SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -288,7 +288,7 @@ TEST(testCase, smlParseCols_Test) { ASSERT_EQ(size, 19); // binary - SSmlKv *kv = taosArrayGetP(cols, 0); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, 0); ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BINARY); @@ -297,7 +297,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // nchar - kv = taosArrayGetP(cols, 1); + kv = (SSmlKv *)taosArrayGetP(cols, 1); ASSERT_EQ(strncasecmp(kv->key, "cnch", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); @@ -306,7 +306,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // bool - kv = taosArrayGetP(cols, 2); + kv = (SSmlKv *)taosArrayGetP(cols, 2); ASSERT_EQ(strncasecmp(kv->key, "cbool", 5), 0); ASSERT_EQ(kv->keyLen, 5); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL); @@ -315,7 +315,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // double - kv = taosArrayGetP(cols, 3); + kv = (SSmlKv *)taosArrayGetP(cols, 3); ASSERT_EQ(strncasecmp(kv->key, "cf64", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE); @@ -325,7 +325,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // float - kv = taosArrayGetP(cols, 4); + kv = (SSmlKv *)taosArrayGetP(cols, 4); ASSERT_EQ(strncasecmp(kv->key, "cf32_", 5), 0); ASSERT_EQ(kv->keyLen, 5); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT); @@ -335,7 +335,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // float - kv = taosArrayGetP(cols, 5); + kv = (SSmlKv *)taosArrayGetP(cols, 5); ASSERT_EQ(strncasecmp(kv->key, "cf32", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT); @@ -345,7 +345,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // tiny int - kv = taosArrayGetP(cols, 6); + kv = (SSmlKv *)taosArrayGetP(cols, 6); ASSERT_EQ(strncasecmp(kv->key, "ci8", 3), 0); ASSERT_EQ(kv->keyLen, 3); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_TINYINT); @@ -354,7 +354,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // unsigned tiny int - kv = taosArrayGetP(cols, 7); + kv = (SSmlKv *)taosArrayGetP(cols, 7); ASSERT_EQ(strncasecmp(kv->key, "cu8", 3), 0); ASSERT_EQ(kv->keyLen, 3); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UTINYINT); @@ -363,7 +363,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // small int - kv = taosArrayGetP(cols, 8); + kv = (SSmlKv *)taosArrayGetP(cols, 8); ASSERT_EQ(strncasecmp(kv->key, "ci16", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_SMALLINT); @@ -372,7 +372,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // unsigned smallint - kv = taosArrayGetP(cols, 9); + kv = (SSmlKv *)taosArrayGetP(cols, 9); ASSERT_EQ(strncasecmp(kv->key, "cu16", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_USMALLINT); @@ -381,7 +381,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // int - kv = taosArrayGetP(cols, 10); + kv = (SSmlKv *)taosArrayGetP(cols, 10); ASSERT_EQ(strncasecmp(kv->key, "ci32", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_INT); @@ -390,7 +390,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // unsigned int - kv = taosArrayGetP(cols, 11); + kv = (SSmlKv *)taosArrayGetP(cols, 11); ASSERT_EQ(strncasecmp(kv->key, "cu32", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UINT); @@ -400,7 +400,7 @@ TEST(testCase, smlParseCols_Test) { // bigint - kv = taosArrayGetP(cols, 12); + kv = (SSmlKv *)taosArrayGetP(cols, 12); ASSERT_EQ(strncasecmp(kv->key, "ci64", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BIGINT); @@ -409,7 +409,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // bigint - kv = taosArrayGetP(cols, 13); + kv = (SSmlKv *)taosArrayGetP(cols, 13); ASSERT_EQ(strncasecmp(kv->key, "ci", 2), 0); ASSERT_EQ(kv->keyLen, 2); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BIGINT); @@ -418,7 +418,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // unsigned bigint - kv = taosArrayGetP(cols, 14); + kv = (SSmlKv *)taosArrayGetP(cols, 14); ASSERT_EQ(strncasecmp(kv->key, "cu64", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_UBIGINT); @@ -427,7 +427,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // bool - kv = taosArrayGetP(cols, 15); + kv = (SSmlKv *)taosArrayGetP(cols, 15); ASSERT_EQ(strncasecmp(kv->key, "cbooltrue", 9), 0); ASSERT_EQ(kv->keyLen, 9); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL); @@ -437,7 +437,7 @@ TEST(testCase, smlParseCols_Test) { // bool - kv = taosArrayGetP(cols, 16); + kv = (SSmlKv *)taosArrayGetP(cols, 16); ASSERT_EQ(strncasecmp(kv->key, "cboolt", 6), 0); ASSERT_EQ(kv->keyLen, 6); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL); @@ -446,7 +446,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // bool - kv = taosArrayGetP(cols, 17); + kv = (SSmlKv *)taosArrayGetP(cols, 17); ASSERT_EQ(strncasecmp(kv->key, "cboolf", 6), 0); ASSERT_EQ(kv->keyLen, 6); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_BOOL); @@ -455,7 +455,7 @@ TEST(testCase, smlParseCols_Test) { taosMemoryFree(kv); // nchar - kv = taosArrayGetP(cols, 18); + kv = (SSmlKv *)taosArrayGetP(cols, 18); ASSERT_EQ(strncasecmp(kv->key, "cnch_", 5), 0); ASSERT_EQ(kv->keyLen, 5); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); @@ -469,7 +469,7 @@ TEST(testCase, smlParseCols_Test) { TEST(testCase, smlParseLine_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(taos, NULL); + ASSERT_NE(taos, nullptr); TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); taos_free_result(pRes); @@ -477,11 +477,11 @@ TEST(testCase, smlParseLine_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); - ASSERT_NE(request, NULL); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); - ASSERT_NE(info, NULL); + ASSERT_NE(info, nullptr); const char *sql[9] = { "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", @@ -494,7 +494,7 @@ TEST(testCase, smlParseLine_Test) { "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000" }; - smlInsertLines(info, sql, 9); + smlInsertLines(info, (char**)sql, 9); // for (int i = 0; i < 3; i++) { // smlParseLine(info, sql[i]); // } @@ -502,7 +502,7 @@ TEST(testCase, smlParseLine_Test) { TEST(testCase, smlParseLine_error_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(taos, NULL); + ASSERT_NE(taos, nullptr); TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); taos_free_result(pRes); @@ -510,17 +510,17 @@ TEST(testCase, smlParseLine_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); - ASSERT_NE(request, NULL); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); - ASSERT_NE(info, NULL); + ASSERT_NE(info, nullptr); const char *sql[2] = { "measure,t1=3 c1=8", "measure,t2=3 c1=8u8" }; - int ret = smlInsertLines(info, sql, 2); + int ret = smlInsertLines(info, (char **)sql, 2); ASSERT_NE(ret, 0); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2751e0752e..5eb6866387 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -452,6 +452,7 @@ typedef struct { int8_t withSchema; int8_t withTag; SRWLatch lock; + int32_t consumerCnt; int32_t sqlLen; int32_t astLen; char* sql; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2a81f28edd..5ad4863322 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -787,6 +787,8 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock } } + // do not show for cleared subscription +#if 0 int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); @@ -829,6 +831,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock numOfRows++; } +#endif taosRUnLockLatch(&pSub->lock); sdbRelease(pSdb, pSub); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b62de0e06e..41d4d5f406 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -89,6 +89,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); + + SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); @@ -152,6 +154,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); if (pTopic->sql == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ef32216810..b49c148ac5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { - fetchOffset = walGetLastVer(pTq->pWal); + fetchOffset = walGetCommittedVer(pTq->pWal); } else { fetchOffset = pReq->currentOffset + 1; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 66c85b9cbb..3ee952fcdf 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -60,7 +60,9 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { if (code != 0) { int32_t err = terrno; const char *errStr = tstrerror(err); - sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno)); + int32_t linuxErr = errno; + const char *linuxErrMsg = strerror(errno); + sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } //assert(code == 0); @@ -79,7 +81,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (code != 0) { int32_t err = terrno; const char *errStr = tstrerror(err); - sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno)); + int32_t linuxErr = errno; + const char *linuxErrMsg = strerror(errno); + sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } //assert(walReadWithHandle(pWalHandle, index) == 0); @@ -113,7 +117,9 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { if (code != 0) { int32_t err = terrno; const char *errStr = tstrerror(err); - sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno)); + int32_t linuxErr = errno; + const char *linuxErrMsg = strerror(errno); + sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } return 0; // to avoid compiler error @@ -144,7 +150,9 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { if (code != 0) { int32_t err = terrno; const char *errStr = tstrerror(err); - sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, errno, strerror(errno)); + int32_t linuxErr = errno; + const char *linuxErrMsg = strerror(errno); + sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } return 0; // to avoid compiler error diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 3e6663f464..9aa848a7bb 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -25,6 +25,8 @@ int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotV int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } +int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; } + static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index e14515286e..0cfe75bf33 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -143,7 +143,11 @@ void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capa int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { int32_t code; + // TODO: valid ver + if (ver > pRead->pWal->vers.commitVer) { + return -1; + } if (pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); diff --git a/source/os/src/osTimezone.c b/source/os/src/osTimezone.c index 06ec7e9464..872d8e740c 100644 --- a/source/os/src/osTimezone.c +++ b/source/os/src/osTimezone.c @@ -172,106 +172,108 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) { -timezone / 3600); #else - if (taosCheckExistFile("/etc/timezone")) { - /* - * NOTE: do not remove it. - * Enforce set the correct daylight saving time(DST) flag according - * to current time - */ - time_t tx1 = taosGetTimestampSec(); - struct tm tm1; - taosLocalTime(&tx1, &tm1); - /* load time zone string from /etc/timezone */ - // FILE *f = fopen("/etc/timezone", "r"); - errno = 0; - TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ); - char buf[68] = {0}; - if (pFile != NULL) { - int len = taosReadFile(pFile, buf, 64); - if (len < 64 && taosGetErrorFile(pFile)) { - taosCloseFile(&pFile); - printf("read /etc/timezone error, reason:%s", strerror(errno)); - return; - } - - taosCloseFile(&pFile); - - buf[sizeof(buf) - 1] = 0; - char *lineEnd = strstr(buf, "\n"); - if (lineEnd != NULL) { - *lineEnd = 0; - } - - // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables - if (strlen(buf) > 0) { - setenv("TZ", buf, 1); - } - } - // get and set default timezone - tzset(); - /* - * get CURRENT time zone. - * system current time zone is affected by daylight saving time(DST) - * - * e.g., the local time zone of London in DST is GMT+01:00, - * otherwise is GMT+00:00 - */ - int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR; - *tsTimezone = tz; - tz += daylight; - - /* - * format example: - * - * Asia/Shanghai (CST, +0800) - * Europe/London (BST, +0100) - */ - snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); - } else { - char buf[4096] = {0}; - char *tz = NULL; - { - int n = readlink("/etc/localtime", buf, sizeof(buf)); - if (n < 0) { - printf("read /etc/localtime error, reason:%s", strerror(errno)); - return; - } - buf[n] = '\0'; - for (int i = n - 1; i >= 0; --i) { - if (buf[i] == '/') { - if (tz) { - tz = buf + i + 1; - break; + char buf[4096] = {0}; + char *tz = NULL; + { + int n = readlink("/etc/localtime", buf, sizeof(buf)); + if (n < 0) { + printf("read /etc/localtime error, reason:%s", strerror(errno)); + + if (taosCheckExistFile("/etc/timezone")) { + /* + * NOTE: do not remove it. + * Enforce set the correct daylight saving time(DST) flag according + * to current time + */ + time_t tx1 = taosGetTimestampSec(); + struct tm tm1; + taosLocalTime(&tx1, &tm1); + /* load time zone string from /etc/timezone */ + // FILE *f = fopen("/etc/timezone", "r"); + errno = 0; + TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ); + char buf[68] = {0}; + if (pFile != NULL) { + int len = taosReadFile(pFile, buf, 64); + if (len < 64 && taosGetErrorFile(pFile)) { + taosCloseFile(&pFile); + printf("read /etc/timezone error, reason:%s", strerror(errno)); + return; } - tz = buf + i + 1; - } - } - if (!tz || 0 == strchr(tz, '/')) { - printf("parsing /etc/localtime failed"); - return; - } - setenv("TZ", tz, 1); - tzset(); + taosCloseFile(&pFile); + + buf[sizeof(buf) - 1] = 0; + char *lineEnd = strstr(buf, "\n"); + if (lineEnd != NULL) { + *lineEnd = 0; + } + + // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables + if (strlen(buf) > 0) { + setenv("TZ", buf, 1); + } + } + // get and set default timezone + tzset(); + /* + * get CURRENT time zone. + * system current time zone is affected by daylight saving time(DST) + * + * e.g., the local time zone of London in DST is GMT+01:00, + * otherwise is GMT+00:00 + */ + int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR; + *tsTimezone = tz; + tz += daylight; + + /* + * format example: + * + * Asia/Shanghai (CST, +0800) + * Europe/London (BST, +0100) + */ + snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); + } else { + printf("There is not /etc/timezone.\n"); + } + return; + } + buf[n] = '\0'; + for (int i = n - 1; i >= 0; --i) { + if (buf[i] == '/') { + if (tz) { + tz = buf + i + 1; + break; + } + tz = buf + i + 1; + } + } + if (!tz || 0 == strchr(tz, '/')) { + printf("parsing /etc/localtime failed"); + return; } - /* - * NOTE: do not remove it. - * Enforce set the correct daylight saving time(DST) flag according - * to current time - */ - time_t tx1 = taosGetTimestampSec(); - struct tm tm1; - taosLocalTime(&tx1, &tm1); - - /* - * format example: - * - * Asia/Shanghai (CST, +0800) - * Europe/London (BST, +0100) - */ - snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0], - -timezone / 3600); + setenv("TZ", tz, 1); + tzset(); } + + /* + * NOTE: do not remove it. + * Enforce set the correct daylight saving time(DST) flag according + * to current time + */ + time_t tx1 = taosGetTimestampSec(); + struct tm tm1; + taosLocalTime(&tx1, &tm1); + + /* + * format example: + * + * Asia/Shanghai (CST, +0800) + * Europe/London (BST, +0100) + */ + snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0], + -timezone / 3600); #endif } diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index 878e3d9031..d5c800b0e9 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -1,337 +1,347 @@ -#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 -#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics -#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics -#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics -#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics - -# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN -# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; -# -# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). -# - -run tsim/tmq/prepareBasicEnv-1vgrp.sim - -#---- global parameters start ----# -$dbName = db -$vgroups = 1 -$stbPrefix = stb -$ctbPrefix = ctb -$ntbPrefix = ntb -$stbNum = 1 -$ctbNum = 10 -$ntbNum = 10 -$rowsPerCtb = 10 -$tstart = 1640966400000 # 2022-01-01 00:00:00.000 -#---- global parameters end ----# - -$pullDelay = 5 -$ifcheckdata = 1 -$showMsg = 1 -$showRow = 0 - -sql connect -sql use $dbName - -print == create topics from super table -sql create topic topic_stb_column as select ts, c3 from stb -sql create topic topic_stb_all as select ts, c1, c2, c3 from stb -sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb - -print == create topics from child table -sql create topic topic_ctb_column as select ts, c3 from ctb0 -sql create topic topic_ctb_all as select * from ctb0 -sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 - -print == create topics from normal table -sql create topic topic_ntb_column as select ts, c3 from ntb0 -sql create topic topic_ntb_all as select * from ntb0 -sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 - -#sql show topics -#if $rows != 9 then -# return -1 -#endi - -$keyList = ' . group.id:cgrp1 -$keyList = $keyList . ' - -$topicNum = 2 - -#=============================== start consume =============================# - - -print ================ test consume from stb -print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all -$topicList = ' . topic_stb_column -$topicList = $topicList . , -$topicList = $topicList . topic_stb_all -$topicList = $topicList . ' - -$consumerId = 0 -$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb -$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfStb -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - - -$topicList = ' . topic_stb_all -$topicList = $topicList . , -$topicList = $topicList . topic_stb_function -$topicList = $topicList . ' -$consumerId = 1 -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - -print == start consumer to pull msgs from stb -print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start - -print == check consume result -wait_consumer_end_from_stb: -sql select * from consumeresult -print ==> rows: $rows -print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] -print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] -if $rows != 2 then - sleep 1000 - goto wait_consumer_end_from_stb -endi -if $data[0][1] == 0 then - if $data[1][1] != 1 then - return -1 - endi -endi -if $data[0][1] == 1 then - if $data[1][1] != 0 then - return -1 - endi -endi - -# $data[0][2]/$data[1][2] should be between $totalMsgOfOneTopic and $totalMsgOfStb. - -if $data[0][2] < $totalMsgOfOneTopic then - return -1 -endi -if $data[0][2] > $totalMsgOfStb then - return -1 -endi -if $data[1][2] < $totalMsgOfOneTopic then - return -1 -endi -if $data[1][2] > $totalMsgOfStb then - return -1 -endi - -$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb -$sumOfMsgCnt = $data[0][2] + $data[1][2] -if $sumOfMsgCnt != $totalMsgCons then - return -1 -endi - -# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb. -if $data[0][3] < $totalMsgOfOneTopic then - return -1 -endi -if $data[0][3] > $totalMsgOfStb then - return -1 -endi -if $data[1][3] < $totalMsgOfOneTopic then - return -1 -endi -if $data[1][3] > $totalMsgOfStb then - return -1 -endi - -$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb -$sumOfRows = $data[0][3] + $data[1][3] -if $sumOfRows != $totalMsgCons then - return -1 -endi - -####################################################################################### -# clear consume info and consume result -#run tsim/tmq/clearConsume.sim -# because drop table function no stable, so by create new db for consume info and result. Modify it later -$cdbName = cdb1 -sql create database $cdbName vgroups 1 -sleep 500 -sql use $cdbName - -print == create consume info table and consume result table for ctb -sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) -sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) - -sql show tables -if $rows != 2 then - return -1 -endi -####################################################################################### - - -print ================ test consume from ctb -print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all -$topicList = ' . topic_ctb_column -$topicList = $topicList . , -$topicList = $topicList . topic_ctb_all -$topicList = $topicList . ' -$consumerId = 0 - -$totalMsgOfOneTopic = $rowsPerCtb -$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfCtb -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - -$topicList = ' . topic_ctb_function -$topicList = $topicList . , -$topicList = $topicList . topic_ctb_all -$topicList = $topicList . ' -$consumerId = 1 -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - -print == start consumer to pull msgs from ctb -print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start - -print == check consume result -wait_consumer_end_from_ctb: -sql select * from consumeresult -print ==> rows: $rows -print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] -print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] -if $rows != 2 then - sleep 1000 - goto wait_consumer_end_from_ctb -endi -if $data[0][1] == 0 then - if $data[1][1] != 1 then - return -1 - endi -endi -if $data[0][1] == 1 then - if $data[1][1] != 0 then - return -1 - endi -endi - -# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfCtb -# or $data[0][2] $totalMsgOfCtb and $data[1][2] == $totalMsgOfOneTopic -if $data[0][2] == $totalMsgOfOneTopic then - if $data[1][2] == $totalMsgOfCtb then - goto check_ok_0 - endi -elif $data[1][2] == $totalMsgOfOneTopic then - if $data[0][2] == $totalMsgOfCtb then - goto check_ok_0 - endi -endi -return -1 -check_ok_0: - -if $data[0][3] == $totalMsgOfOneTopic then - if $data[1][3] == $totalMsgOfCtb then - goto check_ok_1 - endi -elif $data[1][3] == $totalMsgOfOneTopic then - if $data[0][3] == $totalMsgOfCtb then - goto check_ok_1 - endi -endi -return -1 -check_ok_1: - -####################################################################################### -# clear consume info and consume result -#run tsim/tmq/clearConsume.sim -# because drop table function no stable, so by create new db for consume info and result. Modify it later -$cdbName = cdb2 -sql create database $cdbName vgroups 1 -sleep 500 -sql use $cdbName - -print == create consume info table and consume result table for ntb -sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) -sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) - -sql show tables -if $rows != 2 then - return -1 -endi -####################################################################################### - - -print ================ test consume from ntb -print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all -$topicList = ' . topic_ntb_column -$topicList = $topicList . , -$topicList = $topicList . topic_ntb_all -$topicList = $topicList . ' - -$consumerId = 0 -$totalMsgOfOneTopic = $rowsPerCtb -$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum -$expectmsgcnt = $totalMsgOfNtb -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - - -$topicList = ' . topic_ntb_function -$topicList = $topicList . , -$topicList = $topicList . topic_ntb_all -$topicList = $topicList . ' -$consumerId = 1 -sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) - -print == start consumer to pull msgs from ntb -print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start - -print == check consume result from ntb -wait_consumer_end_from_ntb: -sql select * from consumeresult -print ==> rows: $rows -print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] -print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] -if $rows != 2 then - sleep 1000 - goto wait_consumer_end_from_ntb -endi -if $data[0][1] == 0 then - if $data[1][1] != 1 then - return -1 - endi -endi -if $data[0][1] == 1 then - if $data[1][1] != 0 then - return -1 - endi -endi - -# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfNtb -# or $data[0][2] $totalMsgOfNtb and $data[1][2] == $totalMsgOfOneTopic -if $data[0][2] == $totalMsgOfOneTopic then - if $data[1][2] == $totalMsgOfNtb then - goto check_ok_2 - endi -elif $data[1][2] == $totalMsgOfOneTopic then - if $data[0][2] == $totalMsgOfNtb then - goto check_ok_2 - endi -endi -return -1 -check_ok_2: - -if $data[0][3] == $totalMsgOfOneTopic then - if $data[1][3] == $totalMsgOfNtb then - goto check_ok_3 - endi -elif $data[1][3] == $totalMsgOfOneTopic then - if $data[0][3] == $totalMsgOfNtb then - goto check_ok_3 - endi -endi -return -1 -check_ok_3: - -#------ not need stop consumer, because it exit after pull msg overthan expect msg -#system tsim/tmq/consume.sh -s stop -x SIGINT - -system sh/exec.sh -n dnode1 -s stop -x SIGINT +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-1vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +$topicNum = 2 + +#=============================== start consume =============================# + + +print ================ test consume from stb +print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all +$topicList = ' . topic_stb_column +$topicList = $topicList . , +$topicList = $topicList . topic_stb_all +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb +$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + + +$topicList = ' . topic_stb_all +$topicList = $topicList . , +$topicList = $topicList . topic_stb_function +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +# $data[0][2]/$data[1][2] should be between $totalMsgOfOneTopic and $totalMsgOfStb. + +if $data[0][2] < $totalMsgOfOneTopic then + return -1 +endi +if $data[0][2] > $totalMsgOfStb then + return -1 +endi +if $data[1][2] < $totalMsgOfOneTopic then + return -1 +endi +if $data[1][2] > $totalMsgOfStb then + return -1 +endi + +$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb +$sumOfMsgCnt = $data[0][2] + $data[1][2] +if $sumOfMsgCnt != $totalMsgCons then + return -1 +endi + +# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb. +if $data[0][3] < $totalMsgOfOneTopic then + return -1 +endi +if $data[0][3] > $totalMsgOfStb then + return -1 +endi +if $data[1][3] < $totalMsgOfOneTopic then + return -1 +endi +if $data[1][3] > $totalMsgOfStb then + return -1 +endi + +$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb +$sumOfRows = $data[0][3] + $data[1][3] +if $sumOfRows != $totalMsgCons then + return -1 +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb1 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ctb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ctb +print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all +$topicList = ' . topic_ctb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . ' +$consumerId = 0 + +$totalMsgOfOneTopic = $rowsPerCtb +$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +$topicList = ' . topic_ctb_function +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfCtb +# or $data[0][2] $totalMsgOfCtb and $data[1][2] == $totalMsgOfOneTopic +if $data[0][2] == $totalMsgOfOneTopic then + if $data[1][2] == $totalMsgOfCtb then + goto check_ok_0 + endi +elif $data[1][2] == $totalMsgOfOneTopic then + if $data[0][2] == $totalMsgOfCtb then + goto check_ok_0 + endi +endi +return -1 +check_ok_0: + +if $data[0][3] == $totalMsgOfOneTopic then + if $data[1][3] == $totalMsgOfCtb then + goto check_ok_1 + endi +elif $data[1][3] == $totalMsgOfOneTopic then + if $data[0][3] == $totalMsgOfCtb then + goto check_ok_1 + endi +endi +return -1 +check_ok_1: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb2 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ntb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ntb +print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all +$topicList = ' . topic_ntb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfOneTopic = $rowsPerCtb +$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + + +$topicList = ' . topic_ntb_function +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . ' +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +# either $data[0][2] $totalMsgOfOneTopic and $data[1][2] == $totalMsgOfNtb +# or $data[0][2] $totalMsgOfNtb and $data[1][2] == $totalMsgOfOneTopic +if $data[0][2] == $totalMsgOfOneTopic then + if $data[1][2] == $totalMsgOfNtb then + goto check_ok_2 + endi +elif $data[1][2] == $totalMsgOfOneTopic then + if $data[0][2] == $totalMsgOfNtb then + goto check_ok_2 + endi +endi +return -1 +check_ok_2: + +if $data[0][3] == $totalMsgOfOneTopic then + if $data[1][3] == $totalMsgOfNtb then + goto check_ok_3 + endi +elif $data[1][3] == $totalMsgOfOneTopic then + if $data[0][3] == $totalMsgOfNtb then + goto check_ok_3 + endi +endi +return -1 +check_ok_3: + +sql select * from performance_schema.`consumers` +if $rows != 0 then + return -1 +endi + +#sql select * from performance_schema.`subscriptions` +#if $rows != 0 then +# return -1 +#endi + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT