Merge branch 'main' into fix/liaohj

This commit is contained in:
Haojun Liao 2023-02-23 09:04:32 +08:00 committed by GitHub
commit 4432ae51e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1134 additions and 2390 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 634399d
GIT_TAG 61cbfd2
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -189,27 +189,46 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
if (TMQ_CONF_OK != code) return NULL;
if (TMQ_CONF_OK != code) {
goto _end;
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq = tmq_consumer_new(conf, NULL, 0);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
_end:
tmq_conf_destroy(conf);
return tmq;
}

View File

@ -300,6 +300,7 @@ typedef struct STableBlockDistInfo {
int32_t firstSeekTimeUs;
uint32_t numOfInmemRows;
uint32_t numOfSmallBlocks;
uint32_t numOfVgroups;
int32_t blockRowsHisto[20];
} STableBlockDistInfo;

View File

@ -27,11 +27,12 @@ typedef struct SCorEpSet {
SEpSet epSet;
} SCorEpSet;
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
void epsetAssign(SEpSet* dst, const SEpSet* pSrc);
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
SEpSet getEpSet_s(SCorEpSet* pEpSet);

View File

@ -26,12 +26,12 @@ extern "C" {
typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker {
typedef struct SQueueWorker {
int32_t id; // worker id
int64_t pid; // thread pid
TdThread thread; // thread id
void *pool;
} SQWorker;
} SQueueWorker;
typedef struct SQWorkerPool {
int32_t max; // max number of workers
@ -39,7 +39,7 @@ typedef struct SQWorkerPool {
int32_t num; // current number of workers
STaosQset *qset;
const char *name;
SQWorker *workers;
SQueueWorker *workers;
TdThreadMutex mutex;
} SQWorkerPool;

View File

@ -1,328 +0,0 @@
/*
xxHash - Extremely Fast Hash algorithm
Header File
Copyright (C) 2012-2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- xxHash source repository : https://github.com/Cyan4973/xxHash
*/
/* Notice extracted from xxHash homepage :
xxHash is an extremely fast Hash algorithm, running at RAM speed limits.
It also successfully passes all tests from the SMHasher suite.
Comparison (single thread, Windows Seven 32 bits, using SMHasher on a Core 2 Duo @3GHz)
Name Speed Q.Score Author
xxHash 5.4 GB/s 10
CrapWow 3.2 GB/s 2 Andrew
MumurHash 3a 2.7 GB/s 10 Austin Appleby
SpookyHash 2.0 GB/s 10 Bob Jenkins
SBox 1.4 GB/s 9 Bret Mulvey
Lookup3 1.2 GB/s 9 Bob Jenkins
SuperFastHash 1.2 GB/s 1 Paul Hsieh
CityHash64 1.05 GB/s 10 Pike & Alakuijala
FNV 0.55 GB/s 5 Fowler, Noll, Vo
CRC32 0.43 GB/s 9
MD5-32 0.33 GB/s 10 Ronald L. Rivest
SHA1-32 0.28 GB/s 10
Q.Score is a measure of quality of the hash function.
It depends on successfully passing SMHasher test set.
10 is a perfect score.
A 64-bit version, named XXH64, is available since r35.
It offers much better speed, but for 64-bit applications only.
Name Speed on 64 bits Speed on 32 bits
XXH64 13.8 GB/s 1.9 GB/s
XXH32 6.8 GB/s 6.0 GB/s
*/
#ifndef XXHASH_H_5627135585666179
#define XXHASH_H_5627135585666179 1
#if defined (__cplusplus)
extern "C" {
#endif
/* ****************************
* Definitions
******************************/
#include <stddef.h> /* size_t */
typedef enum { XXH_OK=0, XXH_ERROR } XXH_errorcode;
/* ****************************
* API modifier
******************************/
/** XXH_INLINE_ALL (and XXH_PRIVATE_API)
* This is useful to include xxhash functions in `static` mode
* in order to inline them, and remove their symbol from the public list.
* Inlining can offer dramatic performance improvement on small keys.
* Methodology :
* #define XXH_INLINE_ALL
* #include "xxhash.h"
* `xxhash.c` is automatically included.
* It's not useful to compile and link it as a separate module.
*/
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# ifndef XXH_STATIC_LINKING_ONLY
# define XXH_STATIC_LINKING_ONLY
# endif
# if defined(__GNUC__)
# define XXH_PUBLIC_API static __inline __attribute__((unused))
# elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
# define XXH_PUBLIC_API static inline
# elif defined(_MSC_VER)
# define XXH_PUBLIC_API static __inline
# else
/* this version may generate warnings for unused static functions */
# define XXH_PUBLIC_API static
# endif
#else
# define XXH_PUBLIC_API /* do nothing */
#endif /* XXH_INLINE_ALL || XXH_PRIVATE_API */
/*! XXH_NAMESPACE, aka Namespace Emulation :
*
* If you want to include _and expose_ xxHash functions from within your own library,
* but also want to avoid symbol collisions with other libraries which may also include xxHash,
*
* you can use XXH_NAMESPACE, to automatically prefix any public symbol from xxhash library
* with the value of XXH_NAMESPACE (therefore, avoid NULL and numeric values).
*
* Note that no change is required within the calling program as long as it includes `xxhash.h` :
* regular symbol name will be automatically translated by this header.
*/
#ifdef XXH_NAMESPACE
# define XXH_CAT(A,B) A##B
# define XXH_NAME2(A,B) XXH_CAT(A,B)
# define XXH_versionNumber XXH_NAME2(XXH_NAMESPACE, XXH_versionNumber)
# define XXH32 XXH_NAME2(XXH_NAMESPACE, XXH32)
# define XXH32_createState XXH_NAME2(XXH_NAMESPACE, XXH32_createState)
# define XXH32_freeState XXH_NAME2(XXH_NAMESPACE, XXH32_freeState)
# define XXH32_reset XXH_NAME2(XXH_NAMESPACE, XXH32_reset)
# define XXH32_update XXH_NAME2(XXH_NAMESPACE, XXH32_update)
# define XXH32_digest XXH_NAME2(XXH_NAMESPACE, XXH32_digest)
# define XXH32_copyState XXH_NAME2(XXH_NAMESPACE, XXH32_copyState)
# define XXH32_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH32_canonicalFromHash)
# define XXH32_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH32_hashFromCanonical)
# define XXH64 XXH_NAME2(XXH_NAMESPACE, XXH64)
# define XXH64_createState XXH_NAME2(XXH_NAMESPACE, XXH64_createState)
# define XXH64_freeState XXH_NAME2(XXH_NAMESPACE, XXH64_freeState)
# define XXH64_reset XXH_NAME2(XXH_NAMESPACE, XXH64_reset)
# define XXH64_update XXH_NAME2(XXH_NAMESPACE, XXH64_update)
# define XXH64_digest XXH_NAME2(XXH_NAMESPACE, XXH64_digest)
# define XXH64_copyState XXH_NAME2(XXH_NAMESPACE, XXH64_copyState)
# define XXH64_canonicalFromHash XXH_NAME2(XXH_NAMESPACE, XXH64_canonicalFromHash)
# define XXH64_hashFromCanonical XXH_NAME2(XXH_NAMESPACE, XXH64_hashFromCanonical)
#endif
/* *************************************
* Version
***************************************/
#define XXH_VERSION_MAJOR 0
#define XXH_VERSION_MINOR 6
#define XXH_VERSION_RELEASE 5
#define XXH_VERSION_NUMBER (XXH_VERSION_MAJOR *100*100 + XXH_VERSION_MINOR *100 + XXH_VERSION_RELEASE)
XXH_PUBLIC_API unsigned XXH_versionNumber (void);
/*-**********************************************************************
* 32-bit hash
************************************************************************/
typedef unsigned int XXH32_hash_t;
/*! XXH32() :
Calculate the 32-bit hash of sequence "length" bytes stored at memory address "input".
The memory between input & input+length must be valid (allocated and read-accessible).
"seed" can be used to alter the result predictably.
Speed on Core 2 Duo @ 3 GHz (single thread, SMHasher benchmark) : 5.4 GB/s */
XXH_PUBLIC_API XXH32_hash_t XXH32 (const void* input, size_t length, unsigned int seed);
/*====== Streaming ======*/
typedef struct XXH32_state_s XXH32_state_t; /* incomplete type */
XXH_PUBLIC_API XXH32_state_t* XXH32_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH32_freeState(XXH32_state_t* statePtr);
XXH_PUBLIC_API void XXH32_copyState(XXH32_state_t* dst_state, const XXH32_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH32_reset (XXH32_state_t* statePtr, unsigned int seed);
XXH_PUBLIC_API XXH_errorcode XXH32_update (XXH32_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH32_hash_t XXH32_digest (const XXH32_state_t* statePtr);
/*
* Streaming functions generate the xxHash of an input provided in multiple segments.
* Note that, for small input, they are slower than single-call functions, due to state management.
* For small inputs, prefer `XXH32()` and `XXH64()`, which are better optimized.
*
* XXH state must first be allocated, using XXH*_createState() .
*
* Start a new hash by initializing state with a seed, using XXH*_reset().
*
* Then, feed the hash state by calling XXH*_update() as many times as necessary.
* The function returns an error code, with 0 meaning OK, and any other value meaning there is an error.
*
* Finally, a hash value can be produced anytime, by using XXH*_digest().
* This function returns the nn-bits hash as an int or long long.
*
* It's still possible to continue inserting input into the hash state after a digest,
* and generate some new hashes later on, by calling again XXH*_digest().
*
* When done, free XXH state space if it was allocated dynamically.
*/
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[4]; } XXH32_canonical_t;
XXH_PUBLIC_API void XXH32_canonicalFromHash(XXH32_canonical_t* dst, XXH32_hash_t hash);
XXH_PUBLIC_API XXH32_hash_t XXH32_hashFromCanonical(const XXH32_canonical_t* src);
/* Default result type for XXH functions are primitive unsigned 32 and 64 bits.
* The canonical representation uses human-readable write convention, aka big-endian (large digits first).
* These functions allow transformation of hash result into and from its canonical format.
* This way, hash values can be written into a file / memory, and remain comparable on different systems and programs.
*/
#ifndef XXH_NO_LONG_LONG
/*-**********************************************************************
* 64-bit hash
************************************************************************/
typedef unsigned long long XXH64_hash_t;
/*! XXH64() :
Calculate the 64-bit hash of sequence of length "len" stored at memory address "input".
"seed" can be used to alter the result predictably.
This function runs faster on 64-bit systems, but slower on 32-bit systems (see benchmark).
*/
XXH_PUBLIC_API XXH64_hash_t XXH64 (const void* input, size_t length, unsigned long long seed);
/*====== Streaming ======*/
typedef struct XXH64_state_s XXH64_state_t; /* incomplete type */
XXH_PUBLIC_API XXH64_state_t* XXH64_createState(void);
XXH_PUBLIC_API XXH_errorcode XXH64_freeState(XXH64_state_t* statePtr);
XXH_PUBLIC_API void XXH64_copyState(XXH64_state_t* dst_state, const XXH64_state_t* src_state);
XXH_PUBLIC_API XXH_errorcode XXH64_reset (XXH64_state_t* statePtr, unsigned long long seed);
XXH_PUBLIC_API XXH_errorcode XXH64_update (XXH64_state_t* statePtr, const void* input, size_t length);
XXH_PUBLIC_API XXH64_hash_t XXH64_digest (const XXH64_state_t* statePtr);
/*====== Canonical representation ======*/
typedef struct { unsigned char digest[8]; } XXH64_canonical_t;
XXH_PUBLIC_API void XXH64_canonicalFromHash(XXH64_canonical_t* dst, XXH64_hash_t hash);
XXH_PUBLIC_API XXH64_hash_t XXH64_hashFromCanonical(const XXH64_canonical_t* src);
#endif /* XXH_NO_LONG_LONG */
#ifdef XXH_STATIC_LINKING_ONLY
/* ================================================================================================
This section contains declarations which are not guaranteed to remain stable.
They may change in future versions, becoming incompatible with a different version of the library.
These declarations should only be used with static linking.
Never use them in association with dynamic linking !
=================================================================================================== */
/* These definitions are only present to allow
* static allocation of XXH state, on stack or in a struct for example.
* Never **ever** use members directly. */
#if !defined (__VMS) \
&& (defined (__cplusplus) \
|| (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */) )
# include <stdint.h>
struct XXH32_state_s {
uint32_t total_len_32;
uint32_t large_len;
uint32_t v1;
uint32_t v2;
uint32_t v3;
uint32_t v4;
uint32_t mem32[4];
uint32_t memsize;
uint32_t reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
struct XXH64_state_s {
uint64_t total_len;
uint64_t v1;
uint64_t v2;
uint64_t v3;
uint64_t v4;
uint64_t mem64[4];
uint32_t memsize;
uint32_t reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# else
struct XXH32_state_s {
unsigned total_len_32;
unsigned large_len;
unsigned v1;
unsigned v2;
unsigned v3;
unsigned v4;
unsigned mem32[4];
unsigned memsize;
unsigned reserved; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH32_state_t */
# ifndef XXH_NO_LONG_LONG /* remove 64-bit support */
struct XXH64_state_s {
unsigned long long total_len;
unsigned long long v1;
unsigned long long v2;
unsigned long long v3;
unsigned long long v4;
unsigned long long mem64[4];
unsigned memsize;
unsigned reserved[2]; /* never read nor write, might be removed in a future version */
}; /* typedef'd to XXH64_state_t */
# endif
# endif
#if defined(XXH_INLINE_ALL) || defined(XXH_PRIVATE_API)
# include "xxhash.c" /* include xxhash function bodies as `static`, for inlining */
#endif
#endif /* XXH_STATIC_LINKING_ONLY */
#if defined (__cplusplus)
}
#endif
#endif /* XXHASH_H_5627135585666179 */

View File

@ -347,7 +347,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
continue;
}
if (pRequest->killed) {
if (pRequest->killed || 0 == pRequest->body.queryJob) {
releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;

View File

@ -66,7 +66,6 @@ struct tmq_conf_t {
int8_t snapEnable;
int32_t snapBatchSize;
bool hbBgEnable;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
@ -213,6 +212,7 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
SMqClientVg* pMqVg;
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/
/*int32_t vgId;*/
} SMqCommitCbParam;
@ -422,7 +422,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
}
@ -440,6 +439,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
}
#endif
// there may be race condition. fix it
if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) {
SMqClientVg* pMqVg = pParam->pMqVg;
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pParam->pMqVg->epSet = *pBuf->pEpSet;
}
taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
@ -448,7 +458,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
* pOffset->version);*/
tmqCommitRspCountDown(pParamSet);
return 0;
}
@ -458,6 +467,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->val = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId);
@ -471,11 +481,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
if (code < 0) {
return -1;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
taosMemoryFree(pOffset);
return -1;
}
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -492,8 +504,10 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(buf);
return -1;
}
pParam->params = pParamSet;
pParam->pOffset = pOffset;
pParam->pMqVg = pVg; // there may be an race condition
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@ -503,16 +517,18 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(pParam);
return -1;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version);
SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
// TODO: put into cb
// TODO: put into cb, the commit offset should be move to the callback function
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
@ -534,7 +550,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
char* topic;
int32_t vgId;
ASSERT(msg != NULL);
if (TD_RES_TMQ(msg)) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
topic = pRspObj->topic;
@ -637,15 +653,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
// init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1;
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%"PRIx64" start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue;
}
@ -976,14 +993,12 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
ASSERT(conf->groupId[0]);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
pTmq->delayedTask = taosOpenQueue();
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
@ -1062,7 +1077,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq req = {0};
int32_t code = 0;
tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
@ -1085,7 +1100,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName);
tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName);
}
@ -1398,7 +1413,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
}
atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set;
}
@ -1548,7 +1563,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%"PRIx64, tmq->consumerId, async, tmq->consumerId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
@ -1756,28 +1771,28 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
}
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId);
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) {
/*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
return NULL;
}
}
tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
@ -1804,6 +1819,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
@ -1868,7 +1886,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
#if 0
tmqHandleAllDelayedTask(tmq);
@ -1881,7 +1899,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
return NULL;
}
@ -1907,25 +1925,25 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
return NULL;
}
if (timeout != -1) {
int64_t currentTime = taosGetTimestampMs();
int64_t passedTime = currentTime - startTime;
if (passedTime > timeout) {
tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) {
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
/*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - passedTime));
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 1000);

View File

@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__");
tmq_conf_set(conf, "group.id", "consumer_group");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");

View File

@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
return true;
}
void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
if (pSrc == NULL || pDst == NULL) {
return;
}
pDst->inUse = pSrc->inUse;
pDst->numOfEps = pSrc->numOfEps;
for (int32_t i = 0; i < pSrc->numOfEps; ++i) {
pDst->eps[i].port = pSrc->eps[i].port;
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
}
}
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet;

View File

@ -58,7 +58,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SQWorker SQHandle;
typedef struct SQueueWorker SQHandle;
typedef struct {
const char *name;

View File

@ -238,7 +238,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// iterate all consumers, find all modification
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
@ -335,7 +337,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer %" PRId64 " not exist", consumerId);
mError("consumer:0x%"PRIx64 " not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
@ -345,7 +347,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %" PRId64 "", consumerId);
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
@ -390,7 +392,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#if 1
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer %" PRId64 "", consumerId);
mInfo("try to recover consumer:0x%"PRIx64 "", consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
@ -404,14 +406,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#endif
if (status != MQ_CONSUMER_STATUS__READY) {
mInfo("consumer %" PRId64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
mInfo("consumer:0x%"PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
}
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
// 2. check epoch, only send ep info when epoches do not match
// 2. check epoch, only send ep info when epochs do not match
if (epoch != serverEpoch) {
taosRLockLatch(&pConsumer->lock);
mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
@ -526,12 +528,14 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return 0;
}
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
uint64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup;
SMqConsumerObj *pConsumerOld = NULL;
SMqConsumerObj *pConsumerNew = NULL;
@ -542,21 +546,23 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (pTrans == NULL) {
goto _over;
}
for (int32_t i = 0; i < newTopicNum; i++) {
char *topic = taosArrayGetP(newSub, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
goto SUBSCRIBE_OVER;
if (pTopic == NULL) { // terrno has been set by callee function
goto _over;
}
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
goto SUBSCRIBE_OVER;
goto _over;
}
mndReleaseTopic(pMnode, pTopic);
@ -578,8 +584,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else {
/*taosRLockLatch(&pConsumerOld->lock);*/
@ -591,13 +597,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto SUBSCRIBE_OVER;
goto _over;
}
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
if (pConsumerNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto SUBSCRIBE_OVER;
goto _over;
}
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
@ -650,16 +656,16 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto SUBSCRIBE_OVER;
goto _over;
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
}
code = TSDB_CODE_ACTION_IN_PROGRESS;
SUBSCRIBE_OVER:
_over:
mndTransDrop(pTrans);
if (pConsumerOld) {
@ -971,16 +977,19 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break;
if (pShow->pIter == NULL) {
break;
}
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
mDebug("showing consumer %" PRId64 " no assigned topic, skip", pConsumer->consumerId);
mDebug("showing consumer:0x%"PRIx64 " no assigned topic, skip", pConsumer->consumerId);
sdbRelease(pSdb, pConsumer);
continue;
}
taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer %" PRId64, pConsumer->consumerId);
mDebug("showing consumer:0x%"PRIx64, pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true;

View File

@ -523,7 +523,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = NULL;
SSubplan* plan = NULL;
SSubplan* pSubplan = NULL;
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
@ -539,24 +539,27 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1;
}
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
if (opNum != 1) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
return -1;
}
plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
}
ASSERT(pSub->unassignedVgs);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
@ -569,15 +572,15 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pSub->unassignedVgs, &pVgEp);
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t msgLen;
plan->execNode.epSet = pVgEp->epSet;
plan->execNode.nodeId = pVgEp->vgId;
pSubplan->execNode.epSet = pVgEp->epSet;
pSubplan->execNode.nodeId = pVgEp->vgId;
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
@ -590,11 +593,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
sdbRelease(pSdb, pVgroup);
}
ASSERT(pSub->unassignedVgs->size > 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
ASSERT(taosArrayGetSize(pSub->unassignedVgs) > 0);
qDestroyQueryPlan(pPlan);
return 0;
}

View File

@ -39,10 +39,8 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
@ -85,12 +83,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSub->dbUid = pTopic->dbUid;
pSub->stbUid = pTopic->stbUid;
pSub->subType = pTopic->subType;
@ -205,7 +204,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
int32_t totalVgNum = pOutput->pSub->vgNum;
const char *sub = pOutput->pSub->key;
mInfo("sub:%s, mq rebalance vgNum:%d", sub, pOutput->pSub->vgNum);
mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
@ -214,7 +213,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
@ -229,7 +228,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
@ -239,7 +238,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
if (removedNum != actualRemoved) {
mError("sub:%s, mq rebalance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
}
// if previously no consumer, there are vgs not assigned
@ -253,7 +252,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s, mq rebalance remove vgId:%d from unassigned", sub, pVgEp->vgId);
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId);
}
}
@ -267,7 +266,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt = totalVgNum / afterRebConsumerNum;
imbConsumerNum = totalVgNum % afterRebConsumerNum;
}
mInfo("sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg", sub,
mInfo("sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg", sub,
afterRebConsumerNum, minVgCnt, imbConsumerNum);
// 4. first scan: remove consumer more than wanted, put to remove hash
@ -275,7 +275,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
@ -297,7 +300,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
imbCnt++;
@ -312,7 +315,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
@ -330,7 +333,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s, mq rebalance add new consumer:%" PRId64, sub, consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId);
}
}
@ -349,7 +352,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) {
mError("sub:%s, removed iter is null", sub);
mError("sub:%s removed iter is null", sub);
continue;
}
@ -402,33 +405,36 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqRebOutputVg *pRebOutput = NULL;
while (1) {
pIter = taosHashIterate(pHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
pRebOutput = (SMqRebOutputVg *)pIter;
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
mInfo("sub:%s, mq rebalance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
}
}
// 8. generate logs
mInfo("sub:%s, mq rebalance calculation completed, rebalanced vg", sub);
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub);
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("sub:%s, mq rebalance vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, sub,
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
}
{
void *pIter = NULL;
pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s, mq rebalance final cfg: consumer %" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s, mq rebalance final cfg: vg %d to consumer %" PRId64 "", sub, pVgEp->vgId,
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRId64, sub, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
@ -552,11 +558,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL;
mInfo("mq rebalance start");
mInfo("mq re-balance start");
while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
@ -577,12 +586,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) {
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic);
continue;
}
taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
if (rebOutput.pSub == NULL) {
mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr());
@ -605,15 +615,16 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
mError("mq rebalance internal error");
mError("mq re-balance internal error");
}
// if add more consumer to balanced subscribe,
// possibly no vg is changed
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
mError("mq re-balance persist re-balance output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
@ -627,19 +638,18 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
// reset flag
mInfo("mq rebalance completed successfully");
mInfo("mq re-balance completed successfully");
taosHashCleanup(pReq->rebSubHash);
mndRebEnd();
return 0;
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
@ -663,7 +673,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-cgroup");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
@ -956,7 +966,7 @@ END:
return code;
}
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
@ -1090,7 +1100,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
return numOfRows;
}
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -33,7 +33,7 @@
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);

View File

@ -29,7 +29,7 @@
extern "C" {
#endif
typedef struct SQWorker SQHandle;
typedef struct SQueueWorker SQHandle;
typedef struct SQnode {
int32_t qndId;

View File

@ -58,7 +58,7 @@ typedef struct STQ STQ;
typedef struct SVState SVState;
typedef struct SVStatis SVStatis;
typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle;
typedef struct SQueueWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter;

View File

@ -279,10 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 1: set rsma trigger stat cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: destroy the rsma info and associated fetch tasks
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: wait for all triggered fetch tasks to finish
// step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
@ -298,9 +295,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
}
// step 4:
// step 3:
tdRsmaStopExecutor(pSma);
// step 4: destroy the rsma info and associated fetch tasks
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 5:
tdRSmaFSClose(RSMA_FS(pStat));

View File

@ -148,7 +148,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
int32_t code = 0;
int64_t size;
int64_t size = 0;
int64_t n;
TdFilePtr pFD;
char fname[TSDB_FILENAME_LEN];
@ -167,7 +167,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
tPutSmaFile(hdr, pSet->pSmaF);
break;
default:
ASSERT(0);
goto _err; // make the coverity scan happy
}
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);

View File

@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -699,13 +701,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
if (pBlockIdx->uid == pList->tableUidList[j]) {
// this block belongs to a table that is not queried.
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
if (p == NULL) {
tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%"PRIu64", %s", pBlockIdx->uid, pReader->idStr);
return TSDB_CODE_APP_ERROR;
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
if (pScanInfo == NULL) {
return terrno;
}
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
if (pScanInfo->pBlockList == NULL) {
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
}
@ -753,9 +753,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
for (int32_t i = 0; i < numOfTables; ++i) {
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
STableBlockScanInfo* pScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
if (pScanInfo == NULL) {
return terrno;
}
tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
@ -854,9 +855,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
s = pos;
// check
assert(pos >= 0 && pos < num);
assert(num > 0);
ASSERT(pos >= 0 && pos < num && num > 0);
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
@ -1257,14 +1256,13 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) {
STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA;
return terrno;
}
SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
}
#if 0
@ -2507,16 +2505,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (p == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
goto _end;
}
pBlockScanInfo = *(STableBlockScanInfo**)p;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
@ -2855,13 +2848,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
ASSERT(pBlockInfo != NULL);
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
if (pScanInfo == NULL) {
tsdbError("failed to get table scan-info, %s", pReader->idStr);
code = TSDB_CODE_INVALID_PARA;
return code;
return terrno;
}
pBlock = getCurrentBlock(pBlockIter);
@ -4202,7 +4191,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
SSDataBlock* pResBlock = pReader->pResBlock;
if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
}
// do fill all null column value SMA info
@ -4232,6 +4221,18 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
return code;
}
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
int32_t size = taosHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
}
return *p;
}
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
@ -4240,12 +4241,8 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
}
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
return NULL;
}

View File

@ -62,9 +62,6 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
pEntry->dataLen = sizeof(SDeleterRes);
// ASSERT(1 == pEntry->numOfRows);
// ASSERT(3 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);

View File

@ -170,7 +170,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
}
pGroupResInfo->index = 0;
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
}
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
@ -334,10 +333,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
return code;
}
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
SValueNode* pValue = (SValueNode*)pNew;
ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL);
*pQualified = pValue->datum.b;
nodesDestroyNode(pNew);
@ -1056,7 +1052,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
ASSERT(pTagIndexCond == NULL);
vnodeGetCtbIdList(pVnode, pScanNode->suid, pUidList);
} else {
// failed to find the result in the cache, let try to calculate the results
@ -1148,7 +1143,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(pNew);
} else {
taosMemoryFree(keyBuf);
nodesDestroyList(groupNew);
metaReaderClear(&mr);
return code;
@ -1166,7 +1160,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
if (tTagIsJson(data)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
taosMemoryFree(keyBuf);
nodesDestroyList(groupNew);
metaReaderClear(&mr);
return terrno;
@ -1368,7 +1361,6 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
pExprNode->_function.functionName[len] == 0) {
pFuncNode->pParameterList = nodesMakeList();
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) { // todo handle error
} else {
@ -1696,7 +1688,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindo
int64_t key = w->skey;
while (key < ts) { // moving towards end
key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (key >= ts) {
if (key > ts) {
break;
}
@ -1808,7 +1800,6 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
if (pTableList->map == NULL) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
}
@ -1958,7 +1949,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
ASSERT(pTableListInfo->map != NULL);
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
@ -2015,7 +2005,6 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
}
int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
ASSERT(pTableListInfo->numOfOuputGroups == 1);
int64_t st1 = taosGetTimestampUs();
pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;

View File

@ -35,7 +35,6 @@ static void initRefPool() {
}
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
@ -75,8 +74,6 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
}
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
{
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
@ -90,12 +87,10 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
pOperator->status = OP_NOT_OPENED;
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
}
}
return 0;
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
@ -353,7 +348,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return code;
}
// todo refactor STableList
bool assignUid = false;
size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
char* keyBuf = NULL;

View File

@ -240,7 +240,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page
if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
@ -310,7 +309,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
pWindowRes->offset = (int32_t)pData->num;
pData->num += size;
assert(pWindowRes->pageId >= 0);
}
return 0;
@ -488,7 +486,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
// todo: refactor this
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
}
ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
@ -1024,8 +1021,6 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
assert(pResultRow != NULL);
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
@ -1279,7 +1274,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//
// assert(pQueryAttr->limit.offset == 0);
// STimeWindow tw = *win;
// getNextTimeWindow(pQueryAttr, &tw);
//
@ -1294,7 +1288,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// tw = *win;
// int32_t startPos =
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
// assert(startPos >= 0);
//
// // set the abort info
// pQueryAttr->pos = startPos;
@ -1329,11 +1322,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// assert(*start <= pRuntimeEnv->current->lastKey);
// } else {
// assert(*start >= pRuntimeEnv->current->lastKey);
// }
//
// // if queried with value filter, do NOT forward query start position
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
@ -1347,8 +1335,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// value is
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
// */
// assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
//
// STimeWindow w = TSWINDOW_INITIALIZER;
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
//
@ -1418,8 +1404,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// tw = win;
// int32_t startPos =
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
// assert(startPos >= 0);
//
// // set the abort info
// pQueryAttr->pos = startPos;
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
@ -1441,10 +1425,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// }
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
if (p->pDownstream == NULL) {
assert(p->numOfDownstream == 0);
}
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
if (p->pDownstream == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -1800,7 +1780,10 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
}
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
ASSERT(numOfRows != 0);
if (numOfRows == 0) {
numOfRows = 4096;
}
pResultInfo->capacity = numOfRows;
pResultInfo->threshold = numOfRows * 0.75;
@ -1941,7 +1924,6 @@ _error:
}
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
assert(pInfo != NULL);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
@ -2022,7 +2004,12 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUidCache(&mr, suid);
code = metaGetTableEntryByUidCache(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
metaReaderClear(&mr);
return terrno;
}
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else {
@ -2248,7 +2235,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (pOperator != NULL) {
@ -2340,7 +2328,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
taosMemoryFree(ops);
@ -2578,7 +2567,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
return TSDB_CODE_OUT_OF_MEMORY;
}
*pResult = (SResultRow*)value;
ASSERT(*pResult);
// set time window for current result
(*pResult)->win = (*win);
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);

View File

@ -193,8 +193,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
blockDataCleanup(pResBlock);
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);

View File

@ -204,7 +204,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
}
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
char* isNull = (char*)pKey;
@ -570,7 +569,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
(*columnLen) += contentLen;
ASSERT(*columnLen >= 0);
}
(*rows) += 1;
@ -681,7 +679,6 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
const SDataGroupInfo* pGroupInfo2 = group2;
if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
ASSERT(0);
return 0;
}

View File

@ -3019,8 +3019,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
}
}
} else {
strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -768,8 +768,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024);
int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
&pInfo->matchInfo);

View File

@ -350,8 +350,8 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
SExecTaskInfo* pTaskInfo);
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
const char* name, SSDataBlock* pBlock);
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true;
@ -516,7 +516,8 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaTbCursorPrev(pInfo->pCur);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
dataBlock);
}
metaReaderClear(&smrSuperTable);
@ -1489,7 +1490,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
return pOperator;
_error:
@ -2012,8 +2014,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo,
optrDefaultBufFn, NULL);
return pOperator;
_error:

View File

@ -213,6 +213,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
blockDataDestroy(p);
taosArrayDestroy(pPageIdList);
return terrno;
}

View File

@ -795,7 +795,6 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT:
((int64_t*)pCol->pData)[currentRow] = pRes->v;
// colDataSetInt64(pCol, currentRow, &pRes->v);
break;
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT:
@ -2123,7 +2122,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
}
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
@ -2239,7 +2239,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
@ -2875,8 +2876,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
return pRes;
}
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
@ -5410,6 +5411,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
if (pDistInfo->maxRows < p1.maxRows) {
pDistInfo->maxRows = p1.maxRows;
}
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
@ -5438,6 +5440,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1;
@ -5469,6 +5472,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1;
@ -5520,7 +5524,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataSetVal(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
pData->numOfFiles, 0);
pData->numOfFiles, pData->numOfVgroups);
varDataSetLen(st, len);
colDataSetVal(pColInfo, row++, st, false);

View File

@ -76,6 +76,7 @@ typedef struct SQWDebug {
bool lockEnable;
bool statusEnable;
bool dumpEnable;
bool forceStop;
bool sleepSimulate;
bool deadSimulate;
bool redirectSimulate;
@ -248,6 +249,7 @@ typedef struct SQWorkerMgmt {
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
#define QW_SET_QTID(id, qId, tId, eId) \
do { \

View File

@ -9,11 +9,13 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true,
SQWDebug gQWDebug = {.lockEnable = false,
.statusEnable = true,
.dumpEnable = false,
.redirectSimulate = false,
.deadSimulate = false,
.sleepSimulate = false};
.sleepSimulate = false,
.forceStop = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) {
@ -306,6 +308,12 @@ int32_t qwDbgEnableDebug(char *option) {
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "forceStop")) {
gQWDebug.forceStop = true;
qError("qw forceStop debug enabled");
return TSDB_CODE_SUCCESS;
}
qError("invalid qw debug option:%s", option);
return TSDB_CODE_APP_ERROR;

View File

@ -18,6 +18,51 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0,
};
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
QW_TASK_DLOG_E("task running, async killed");
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};
@ -973,6 +1018,10 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
qwDbgDumpMgmtInfo(mgmt);
if (gQWDebug.forceStop) {
(void)qwStopAllTasks(mgmt);
}
QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash);
@ -1087,6 +1136,7 @@ _return:
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker");
@ -1186,45 +1236,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
uint64_t qId, tId, sId;
int32_t eId;
int64_t rId = 0;
atomic_store_8(&mgmt->nodeStopped, 1);
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} else {
qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
(void)qwStopAllTasks(mgmt);
}
void qWorkerDestroy(void **qWorkerMgmt) {

View File

@ -727,7 +727,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = 0;
conn->broken = false;
transRefCliHandle(conn);
atomic_add_fetch_32(&pThrd->connCount, 1);
@ -997,6 +997,11 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
taosMemoryFree(pBatch);
}
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pThrd->quit == true) {
cliDestroyBatch(pBatch);
return;
}
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return;
}
@ -1082,18 +1087,24 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
if (status != 0) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status));
cliHandleExcept(conn);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn);
cliHandleBatchReq(nxtBatch, thrd);
} else {
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
if (nxtBatch != NULL) {
conn->pBatch = nxtBatch;
cliSendBatch(conn);
} else {
addConnToPool(thrd->pool, conn);
}
} else {
cliDestroyBatch(nxtBatch);
// conn release by other callback
}
}
cliDestroyBatch(p);
@ -1454,6 +1465,11 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->type == Quit) {
pThrd->stopMsg = pMsg;
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++;
@ -1485,6 +1501,12 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->type == Quit) {
pThrd->stopMsg = pMsg;
continue;
}
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
STransConnCtx* pCtx = pMsg->ctx;
@ -1582,7 +1604,6 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliThrd* pThrd = item->pThrd;
STrans* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = NULL;
// batch process to avoid to lock/unlock frequently
queue wq;
taosThreadMutexLock(&item->mtx);
@ -2285,24 +2306,11 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
/*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
char key[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val != NULL && *val >= pTransInst->connLimitNum) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_MAX_SESSIONS;
}
}*/
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet;
pCtx->origEpSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;

View File

@ -327,23 +327,26 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[1] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[1] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 2, 3
// 1, 3, 5, 7
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
delta = _mm256_add_epi64(delta, _mm256_slli_si256(delta, 8));
__m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
// 1, 3, 5, 7
//+ 0, 0, 1, 3
// 1, 3, 3, 7
//+ 0, 0, 3, 3
// 1, 3, 6, 10
// shift and add operation for the second round
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), firstPart);
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
firstPart = _mm_add_epi64(firstPart, prev);
secPart = _mm_add_epi64(secPart, prev);
@ -353,15 +356,18 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]);
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit))) & mask);
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit;
}
} else {
@ -370,6 +376,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
v += bit;
}
}

View File

@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
if ((*num) >= size) {
size = (size << 1);
split = taosMemoryRealloc(split, POINTER_BYTES * size);
ASSERTS(NULL != split, "realloc memory failed. size=%d", POINTER_BYTES * size);
ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t) POINTER_BYTES * size);
}
}

View File

@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset();
pool->workers = taosMemoryCalloc(pool->max, sizeof(SQWorker));
pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -31,7 +31,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
(void)taosThreadMutexInit(&pool->mutex, NULL);
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
SQueueWorker *worker = pool->workers + i;
worker->id = i;
worker->pool = pool;
}
@ -42,14 +42,14 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset);
}
}
for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i;
SQueueWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL);
@ -65,7 +65,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name);
}
static void *tQWorkerThreadFp(SQWorker *worker) {
static void *tQWorkerThreadFp(SQueueWorker *worker) {
SQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0};
void *msg = NULL;
@ -106,7 +106,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
// spawn a thread to process queue
if (pool->num < pool->max) {
do {
SQWorker *worker = pool->workers + pool->num;
SQueueWorker *worker = pool->workers + pool->num;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
@ -138,7 +138,7 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
pool->qset = taosOpenQset();
pool->workers = taosArrayInit(2, sizeof(SQWorker *));
pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -153,14 +153,14 @@ int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
int32_t size = taosArrayGetSize(pool->workers);
for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i);
SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset);
}
}
for (int32_t i = 0; i < size; ++i) {
SQWorker *worker = taosArrayGetP(pool->workers, i);
SQueueWorker *worker = taosArrayGetP(pool->workers, i);
if (taosCheckPthreadValid(worker->thread)) {
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
taosThreadJoin(worker->thread, NULL);
@ -177,7 +177,7 @@ void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
uInfo("worker:%s is closed", pool->name);
}
static void *tAutoQWorkerThreadFp(SQWorker *worker) {
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
SAutoQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0};
void *msg = NULL;
@ -222,7 +222,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
taosMemoryFree(worker);

File diff suppressed because it is too large Load Diff

View File

@ -55,7 +55,7 @@ fi
date
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true;make -j || exit 1"
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan"
@ -70,7 +70,7 @@ mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugNoSan
date
docker run \
-v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1 "
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true;make -j || exit 1"
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -18,7 +18,8 @@ from __future__ import annotations
from typing import Any, Set, Tuple
from typing import Dict
from typing import List
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
from typing import \
Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
import textwrap
import time
@ -39,7 +40,6 @@ import gc
import taos
from taos.tmq import *
from .shared.types import TdColumns, TdTags
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
@ -69,6 +69,7 @@ gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injecti
# logger: logging.Logger
gContainer: Container
# def runThread(wt: WorkerThread):
# wt.run()
@ -163,7 +164,6 @@ class WorkerThread:
Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
break
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
try:
if (Config.getConfig().per_thread_db_connection): # most likely TRUE
@ -172,7 +172,8 @@ class WorkerThread:
# self.useDb() # might encounter exceptions. TODO: catch
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x383, 0x386, 0x00B, 0x014] : # invalid database, dropping, Unable to establish connection, Database not ready
if errno in [0x383, 0x386, 0x00B,
0x014]: # invalid database, dropping, Unable to establish connection, Database not ready
# ignore
dummy = 0
else:
@ -251,6 +252,7 @@ class WorkerThread:
# else:
# return self._tc.getDbState().getDbConn().query(sql)
# The coordinator of all worker threads, mostly running in main thread
@ -374,7 +376,8 @@ class ThreadCoordinator:
# TODO: saw an error here once, let's print out stack info for err?
traceback.print_stack() # Stack frame to here.
Logging.info("Caused by:")
traceback.print_exception(*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/
traceback.print_exception(
*sys.exc_info()) # Ref: https://www.geeksforgeeks.org/how-to-print-exception-stack-trace-in-python/
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("State transition error: {}".format(err))
@ -409,7 +412,6 @@ class ThreadCoordinator:
# print("\n")
# print(h.heap())
try:
self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP)
@ -492,7 +494,6 @@ class ThreadCoordinator:
self._execStats = None
self._runStatus = None
def printStats(self):
self._execStats.printStats()
@ -564,6 +565,7 @@ class ThreadCoordinator:
with self._lock:
self._executedTasks.append(task)
class ThreadPool:
def __init__(self, numThreads, maxSteps):
self.numThreads = numThreads
@ -587,6 +589,7 @@ class ThreadPool:
def cleanup(self):
self.threadList = [] # maybe clean up each?
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
@ -801,7 +804,8 @@ class AnyState:
for task in tasks:
if isinstance(task, cls):
raise CrashGenError(
"This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
"This task: {}, is not expected to be present, given the success/failure of others".format(
cls.__name__))
def assertNoSuccess(self, tasks, cls):
for task in tasks:
@ -1016,7 +1020,6 @@ class StateMechine:
sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc): # no regular tables
# print("debug=====*\n"*100)
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
@ -1096,10 +1099,13 @@ class StateMechine:
weightsTypes = BasicTypes.copy()
# this matrixs can balance the Frequency of TaskTypes
balance_TaskType_matrixs = {'TaskDropDb': 5 , 'TaskDropTopics': 20 , 'TaskDropStreams':10 , 'TaskDropStreamTables':10 ,
balance_TaskType_matrixs = {'TaskDropDb': 5, 'TaskDropTopics': 20, 'TaskDropStreams': 10,
'TaskDropStreamTables': 10,
'TaskReadData': 50, 'TaskDropSuperTable': 5, 'TaskAlterTags': 3, 'TaskAddData': 10,
'TaskDeleteData':10 , 'TaskCreateDb':10 , 'TaskCreateStream': 3, 'TaskCreateTopic' :3,
'TaskCreateConsumers':10, 'TaskCreateSuperTable': 10 } # TaskType : balance_matrixs of task
'TaskDeleteData': 10, 'TaskCreateDb': 10, 'TaskCreateStream': 3,
'TaskCreateTopic': 3,
'TaskCreateConsumers': 10,
'TaskCreateSuperTable': 10} # TaskType : balance_matrixs of task
for task, weights in balance_TaskType_matrixs.items():
@ -1111,7 +1117,6 @@ class StateMechine:
task = random.sample(weightsTypes, 1)
return task[0]
# ref:
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
def _weighted_choice_sub(self, weights) -> int:
@ -1123,6 +1128,7 @@ class StateMechine:
return i
raise CrashGenError("Unexpected no choice")
class Database:
''' We use this to represent an actual TDengine database inside a service instance,
possibly in a cluster environment.
@ -1194,7 +1200,8 @@ class Database:
500 # a number representing seconds within 10 years
# print("elSec = {}".format(elSec))
t3 = datetime.datetime(local_epoch_time[0]-10, local_epoch_time[1], local_epoch_time[2]) # default "keep" is 10 years
t3 = datetime.datetime(local_epoch_time[0] - 10, local_epoch_time[1],
local_epoch_time[2]) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above
Logging.debug("Setting up TICKS to start from: {}".format(t4))
@ -1210,11 +1217,14 @@ class Database:
# 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick()
cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
cls._lastLaggingTick = tick + datetime.timedelta(0,
-60 * 2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future
if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
if Config.isSet('mix_oos_data') and Dice.throw(
20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0,
1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick
else: # regular
# add one second to it
@ -1334,8 +1344,6 @@ class Task():
self._execStats = execStats
self._db = db # A task is always associated/for a specific DB
def isSuccess(self):
return self._err is None
@ -1417,9 +1425,6 @@ class Task():
0x0203, # Invalid value
0x03f0, # Stream already exist , topic already exists
1000 # REST catch-all error
]:
return True # These are the ALWAYS-ACCEPTABLE ones
@ -1443,7 +1448,6 @@ class Task():
return False # Not an acceptable error
def execute(self, wt: WorkerThread):
wt.verifyThreadSelf()
self._workerThread = wt # type: ignore
@ -1485,7 +1489,8 @@ class Task():
# raise # so that we see full stack
traceback.print_exc()
print(
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(
errMsg) +
"----------------------------\n")
# sys.exit(-1)
self._err = err
@ -1718,10 +1723,15 @@ class TaskCreateDb(StateTransitionTask):
cache_model = Dice.choice(['none', 'last_row', 'last_value', 'both'])
buffer = random.randint(3, 128)
dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr, updatePostfix, vg_nums, cache_model,buffer ) )
self.execWtSql(wt, "create database {} {} {} vgroups {} cachemodel '{}' buffer {} ".format(dbName, repStr,
updatePostfix,
vg_nums,
cache_model,
buffer))
if dbName == "db_0" and Config.getConfig().use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix))
class TaskDropDb(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -1734,7 +1744,8 @@ class TaskDropDb(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
try:
self.queryWtSql(wt, "drop database {}".format(self._db.getName())) # drop database maybe failed ,because topic exists
self.queryWtSql(wt, "drop database {}".format(
self._db.getName())) # drop database maybe failed ,because topic exists
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x0203]: # drop maybe failed
@ -1769,7 +1780,8 @@ class TaskCreateStream(StateTransitionTask):
stbname = sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn())
aggExpr = Dice.choice([
'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)','min(speed)', 'max(speed)', 'first(speed)', 'last(speed)',
'count(*)', 'avg(speed)', 'sum(speed)', 'stddev(speed)', 'min(speed)', 'max(speed)', 'first(speed)',
'last(speed)',
'apercentile(speed, 10)', 'last_row(*)', 'twa(speed)'])
stream_sql = '' # set default value
@ -1814,19 +1826,25 @@ class TaskCreateTopic(StateTransitionTask):
stbname = sTable.getName()
sub_tables = sTable.getRegTables(wt.getDbConn())
scalarExpr = Dice.choice([ '*','speed','color','abs(speed)','acos(speed)','asin(speed)','atan(speed)','ceil(speed)','cos(speed)','cos(speed)',
'floor(speed)','log(speed,2)','pow(speed,2)','round(speed)','sin(speed)','sqrt(speed)','char_length(color)','concat(color,color)',
'concat_ws(" ", color,color," ")','length(color)', 'lower(color)', 'ltrim(color)','substr(color , 2)','upper(color)','cast(speed as double)',
scalarExpr = Dice.choice(
['*', 'speed', 'color', 'abs(speed)', 'acos(speed)', 'asin(speed)', 'atan(speed)', 'ceil(speed)',
'cos(speed)', 'cos(speed)',
'floor(speed)', 'log(speed,2)', 'pow(speed,2)', 'round(speed)', 'sin(speed)', 'sqrt(speed)',
'char_length(color)', 'concat(color,color)',
'concat_ws(" ", color,color," ")', 'length(color)', 'lower(color)', 'ltrim(color)', 'substr(color , 2)',
'upper(color)', 'cast(speed as double)',
'cast(ts as bigint)'])
topic_sql = '' # set default value
if Dice.throw(3) == 0: # create topic : source data from sub query
if sub_tables: # if not empty
sub_tbname = sub_tables[0]
# create topic : source data from sub query of sub stable
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name,scalarExpr,dbname,sub_tbname)
topic_sql = 'create topic {} as select {} FROM {}.{} ; '.format(sub_topic_name, scalarExpr, dbname,
sub_tbname)
else: # create topic : source data from sub query of stable
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name,scalarExpr, dbname,stbname)
topic_sql = 'create topic {} as select {} FROM {}.{} '.format(super_topic_name, scalarExpr, dbname,
stbname)
elif Dice.throw(3) == 1: # create topic : source data from super table
topic_sql = 'create topic {} AS STABLE {}.{} '.format(stable_topic, dbname, stbname)
@ -1840,6 +1858,7 @@ class TaskCreateTopic(StateTransitionTask):
self.execWtSql(wt, topic_sql)
Logging.debug("[OPS] db topic is creating at {}".format(time.time()))
class TaskDropTopics(StateTransitionTask):
@classmethod
@ -1853,7 +1872,6 @@ class TaskDropTopics(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
@ -1865,6 +1883,7 @@ class TaskDropTopics(StateTransitionTask):
sTable.dropTopics(wt.getDbConn(), dbname, None) # drop topics of database
sTable.dropTopics(wt.getDbConn(), dbname, tblName) # drop topics of stable
class TaskDropStreams(StateTransitionTask):
@classmethod
@ -1878,7 +1897,6 @@ class TaskDropStreams(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
@ -1889,6 +1907,7 @@ class TaskDropStreams(StateTransitionTask):
if sTable.hasStreams(wt.getDbConn()):
sTable.dropStreams(wt.getDbConn()) # drop stream of database
class TaskDropStreamTables(StateTransitionTask):
@classmethod
@ -1902,7 +1921,6 @@ class TaskDropStreamTables(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
# dbname = self._db.getName()
if not self._db.exists(wt.getDbConn()):
Logging.debug("Skipping task, no DB yet")
return
@ -1913,6 +1931,7 @@ class TaskDropStreamTables(StateTransitionTask):
if sTable.hasStreamTables(wt.getDbConn()):
sTable.dropStreamTables(wt.getDbConn()) # drop stream tables
class TaskCreateConsumers(StateTransitionTask):
@classmethod
@ -1974,7 +1993,6 @@ class TdSuperTable:
def getName(self):
return self._stName
def drop(self, dbc, skipCheck=False):
dbName = self._dbName
if self.exists(dbc): # if myself exists
@ -2018,27 +2036,18 @@ class TdSuperTable:
def createConsumer(self, dbc, Consumer_nums):
def generateConsumer(current_topic_list):
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
# conf.set("enable.auto.commit", "true")
# def tmq_commit_cb_print(tmq, resp, offset, param=None):
# print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
# conf.set_auto_commit_cb(tmq_commit_cb_print, None)
consumer = conf.new_consumer()
topic_list = TaosTmqList()
consumer = Consumer({"group.id": "tg2", "td.connect.user": "root", "td.connect.pass": "taosdata"})
topic_list = []
for topic in current_topic_list:
topic_list.append(topic)
try:
consumer.subscribe(topic_list)
except TmqError as e :
pass
# consumer with random work life
time_start = time.time()
while 1:
res = consumer.poll(1000)
res = consumer.poll(1)
consumer.commit(res)
if time.time() - time_start > random.randint(5, 50):
break
try:
@ -2067,14 +2076,16 @@ class TdSuperTable:
def getRegTables(self, dbc: DbConn):
dbName = self._dbName
try:
dbc.query("select distinct TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
dbc.query("select distinct TBNAME from {}.{}".format(dbName,
self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
Logging.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
raise
qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
return [v[0] for v in
qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn):
@ -2317,7 +2328,6 @@ class TdSuperTable:
]) # TODO: add more from 'top'
# if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance
@ -2329,6 +2339,7 @@ class TdSuperTable:
return ret
class TaskReadData(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -2368,7 +2379,6 @@ class TaskReadData(StateTransitionTask):
# by now, causing error below to be incorrectly handled due to timing issue
return # TODO: fix server restart status race condtion
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self._reconnectIfNeeded(wt)
@ -2386,6 +2396,7 @@ class TaskReadData(StateTransitionTask):
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise
class SqlQuery:
@classmethod
def buildRandom(cls, db: Database):
@ -2399,6 +2410,7 @@ class SqlQuery:
def getSql(self):
return self._sql
class TaskDropSuperTable(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -2430,7 +2442,6 @@ class TaskDropSuperTable(StateTransitionTask):
Logging.debug("[DB] Acceptable error when dropping a table")
continue # try to delete next regular table
if (not tickOutput):
tickOutput = True # Print only one time
if isSuccess:
@ -2443,8 +2454,6 @@ class TaskDropSuperTable(StateTransitionTask):
self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
class TaskAlterTags(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -2472,6 +2481,7 @@ class TaskAlterTags(StateTransitionTask):
sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask):
_isRunning = False
_classLock = threading.Lock()
@ -2487,6 +2497,7 @@ class TaskRestartService(StateTransitionTask):
return False # don't run this otherwise
CHANCE_TO_RESTART_SERVICE = 200
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
print("_a", end="", flush=True)
@ -2500,11 +2511,13 @@ class TaskRestartService(StateTransitionTask):
if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
dbc = wt.getDbConn()
dbc.execute("select * from information_schema.ins_databases") # simple delay, align timing with other workers
dbc.execute(
"select * from information_schema.ins_databases") # simple delay, align timing with other workers
gSvcMgr.restart()
self._isRunning = False
class TaskAddData(StateTransitionTask):
# Track which table is being actively worked on
activeTable: Set[int] = set()
@ -2571,8 +2584,6 @@ class TaskAddData(StateTransitionTask):
# Logging.info("Data added in batch: {}".format(sql))
self._unlockTableIfNeeded(fullTableName)
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
@ -2590,7 +2601,8 @@ class TaskAddData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try:
sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
@ -2604,7 +2616,8 @@ class TaskAddData(StateTransitionTask):
intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
intToUpdate = db.getNextInt() # Updated but should not succeed
nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
@ -2692,6 +2705,7 @@ class TaskAddData(StateTransitionTask):
self.activeTable.discard(i) # not raising an error, unlike remove
class TaskDeleteData(StateTransitionTask):
# Track which table is being actively worked on
activeTable: Set[int] = set()
@ -2756,7 +2770,8 @@ class TaskDeleteData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try:
sql = "delete from {} where ts = '{}' ;".format( # removed: tags ('{}', {})
@ -2772,7 +2787,8 @@ class TaskDeleteData(StateTransitionTask):
intWrote = intToWrite
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
intToUpdate = db.getNextInt() # Updated but should not succeed
# nextColor = db.getNextColor()
sql = "delete from {} where ts = '{}' ;".format( # "INSERt" means "update" here
@ -2827,7 +2843,8 @@ class TaskDeleteData(StateTransitionTask):
# TODO: too ugly trying to lock the table reliably, refactor...
fullTableName = db.getName() + '.' + regTableName
self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
self._lockTableIfNeeded(
fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock
try:
sql = "delete from {} ;".format( # removed: tags ('{}', {})
@ -2837,7 +2854,8 @@ class TaskDeleteData(StateTransitionTask):
# Logging.info("Data added: {}".format(sql))
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
if (not Config.getConfig().use_shadow_db) and Dice.throw(
5) == 0: # 1 in N chance, plus not using shaddow DB
sql = "delete from {} ;".format( # "INSERt" means "update" here
fullTableName)
dbc.execute(sql)
@ -2937,7 +2955,9 @@ class ThreadStacks: # stack info for all threads
lastSqlForThread = DbConn.fetchSqlForThread(shortTid)
last_sql_commit_time = DbConn.get_save_sql_time(shortTid)
# time_cost = DbConn.get_time_cost()
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid, self.current_time-last_sql_commit_time ,lastSqlForThread))
print("Last SQL statement attempted from thread {} ({:.4f} sec ago) is: {}".format(shortTid,
self.current_time - last_sql_commit_time,
lastSqlForThread))
stackFrame = 0
for frame in stack: # was using: reversed(stack)
# print(frame)
@ -2949,6 +2969,7 @@ class ThreadStacks: # stack info for all threads
if self.current_time - last_sql_commit_time > 100: # dead lock occured
print("maybe dead locked of thread {} ".format(shortTid))
class ClientManager:
def __init__(self):
Logging.info("Starting service manager")
@ -3062,7 +3083,6 @@ class ClientManager:
svcMgr.stopTaosServices()
svcMgr = None
# Release global variables
# gConfig = None
Config.clearConfig()
@ -3093,6 +3113,7 @@ class ClientManager:
# self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
self.tc.printStats()
class MainExec:
def __init__(self):
self._clientMgr = None
@ -3131,7 +3152,8 @@ class MainExec:
def runService(self):
global gSvcMgr
gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr = self._svcMgr = ServiceManager(
Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
gSvcMgr.run() # run to some end state
gSvcMgr = self._svcMgr = None
@ -3259,7 +3281,6 @@ class MainExec:
return parser
def init(self): # TODO: refactor
global gContainer
gContainer = Container() # micky-mouse DI

View File

@ -38,8 +38,9 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# /root/TDengine/source/libs/function/src/builtinsimpl.c:856:29: runtime error: signed integer overflow: 9223372036854775806 + 9223372036854775805 cannot be represented in type 'long int'
# /root/TDengine/source/libs/scalar/src/sclvector.c:1075:66: runtime error: signed integer overflow: 9223372034707292160 + 1668838476672 cannot be represented in type 'long int'
# /root/TDengine/source/common/src/tdataformat.c:1876:7: runtime error: signed integer overflow: 8252423483843671206 + 2406154664059062870 cannot be represented in type 'long int'
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l`
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14"|wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"

View File

@ -12,12 +12,13 @@
# -*- coding: utf-8 -*-
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from taos.tmq import *
from util.cases import *
from util.common import *
from util.log import *
from util.sql import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
@ -67,14 +68,17 @@ class TDTestCase:
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
]
self.tbnum = 1
def prepare_data(self):
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
def create_user(self):
for user_name in ['jiacy1_all','jiacy1_read','jiacy1_write','jiacy1_none','jiacy0_all','jiacy0_read','jiacy0_write','jiacy0_none']:
for user_name in ['jiacy1_all', 'jiacy1_read', 'jiacy1_write', 'jiacy1_none', 'jiacy0_all', 'jiacy0_read',
'jiacy0_write', 'jiacy0_none']:
if 'jiacy1' in user_name.lower():
tdSql.execute(f'create user {user_name} pass "123" sysinfo 1')
elif 'jiacy0' in user_name.lower():
@ -101,6 +105,7 @@ class TDTestCase:
self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured")
pass
def drop_topic(self):
jiacy1_all_conn = taos.connect(user='jiacy1_all', password='123')
jiacy1_read_conn = taos.connect(user='jiacy1_read', password='123')
@ -114,7 +119,8 @@ class TDTestCase:
for user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
user.execute(f'create topic db_jiacy as select * from db.stb')
user.execute('drop topic db_jiacy')
for user in [jiacy1_write_conn,jiacy1_none_conn,jiacy0_write_conn,jiacy0_none_conn,jiacy1_all_conn,jiacy1_read_conn,jiacy0_all_conn,jiacy0_read_conn]:
for user in [jiacy1_write_conn, jiacy1_none_conn, jiacy0_write_conn, jiacy0_none_conn, jiacy1_all_conn,
jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
sql_list = []
if user in [jiacy1_all_conn, jiacy1_read_conn, jiacy0_all_conn, jiacy0_read_conn]:
sql_list = ['drop topic root_db']
@ -134,27 +140,20 @@ class TDTestCase:
self.queryCols = 0
self.queryResult = None
tdLog.info(f"sql:{sql}, expect error occured")
def tmq_commit_cb_print(tmq, resp, param=None):
print(f"commit: {resp}, tmq: {tmq}, param: {param}")
def subscribe_topic(self):
print("create topic")
tdSql.execute('create topic db_topic as select * from db.stb')
tdSql.execute('grant subscribe on db_topic to jiacy1_all')
print("build consumer")
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "jiacy1_all")
conf.set("td.connect.pass", "123")
conf.set("enable.auto.commit", "true")
conf.set_auto_commit_cb(self.tmq_commit_cb_print, None)
tmq = conf.new_consumer()
tmq = Consumer({"group.id": "tg2", "td.connect.user": "jiacy1_all", "td.connect.pass": "123",
"enable.auto.commit": "true"})
print("build topic list")
topic_list = TaosTmqList()
topic_list.append("db_topic")
tmq.subscribe(["db_topic"])
print("basic consume loop")
tmq.subscribe(topic_list)
sub_list = tmq.subscription()
print("subscribed topics: ", sub_list)
c = 0
l = 0
for i in range(10):
@ -163,19 +162,22 @@ class TDTestCase:
res = tmq.poll(10)
print(f"loop {l}")
l += 1
if res:
if not res:
print(f"received empty message at loop {l} (committed {c})")
continue
if res.error():
print(f"consumer error at loop {l} (committed {c}) {res.error()}")
continue
c += 1
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
topic = res.topic()
db = res.database()
print(f"topic: {topic}\ndb: {db}")
for row in res:
print(row)
print(row.fetchall())
print("* committed")
tmq.commit(res)
else:
print(f"received empty message at loop {l} (committed {c})")
pass
def run(self):
tdSql.prepare()
@ -184,9 +186,11 @@ class TDTestCase:
self.drop_topic()
self.user_privilege_check()
self.subscribe_topic()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -400,7 +400,7 @@ TAOS* createNewTaosConnect() {
int32_t retryCnt = 10;
while (retryCnt--) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (NULL != taos) {
return taos;
}
@ -780,7 +780,8 @@ void loop_consume(SThreadInfo* pInfo) {
if (pInfo->ifCheckData) {
char filename[256] = {0};
char tmpString[128];
memset(tmpString, 0, tListLen(tmpString));
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
// getCurrentTimeString(tmpString));
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
@ -834,12 +835,12 @@ void loop_consume(SThreadInfo* pInfo) {
}
if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
char tmpString[128];
memset(tmpString, 0, tListLen(tmpString));
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
break;
}
} else {
char tmpString[128];
memset(tmpString, 0, tListLen(tmpString));
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
break;
}
@ -1113,7 +1114,7 @@ void omb_loop_consume(SThreadInfo* pInfo) {
lastTotalLenOfMsg = totalLenOfMsg;
}
} else {
char tmpString[128];
memset(tmpString, 0, tListLen(tmpString));
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
int64_t currentPrintTime = taosGetTimestampMs();
@ -1381,7 +1382,7 @@ void startOmbConsume() {
printf("SQL: %s\n", sql);
queryDbExec(taos, sql, NO_INSERT_TYPE);
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers);
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {