diff --git a/cmake/cmake.version b/cmake/cmake.version
index a30618157b..d0d455c73d 100644
--- a/cmake/cmake.version
+++ b/cmake/cmake.version
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
- SET(TD_VER_NUMBER "3.0.2.5")
+ SET(TD_VER_NUMBER "3.0.2.6")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index ae3b626f88..db2ae92f6e 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG 61cbfd2
+ GIT_TAG 1e15545
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/examples/c/tmq.c b/examples/c/tmq.c
index 8a2112fbcc..525f3e106f 100644
--- a/examples/c/tmq.c
+++ b/examples/c/tmq.c
@@ -61,7 +61,7 @@ static int32_t init_env() {
printf("create database\n");
pRes = taos_query(pConn, "drop topic topicname");
if (taos_errno(pRes) != 0) {
- printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
+ printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
diff --git a/include/common/systable.h b/include/common/systable.h
index 6f65c1e8b8..cfc0af0172 100644
--- a/include/common/systable.h
+++ b/include/common/systable.h
@@ -12,6 +12,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
+#ifndef TDENGINE_SYSTABLE_H
+#define TDENGINE_SYSTABLE_H
#ifdef __cplusplus
extern "C" {
@@ -19,9 +22,6 @@ extern "C" {
#include "os.h"
-#ifndef TDENGINE_SYSTABLE_H
-#define TDENGINE_SYSTABLE_H
-
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "ins_dnodes"
#define TSDB_INS_TABLE_MNODES "ins_mnodes"
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index 5b640dce92..f2f7ac5699 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -26,6 +26,7 @@ extern "C" {
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
+#include "systable.h"
typedef enum {
JOB_TASK_STATUS_NULL = 0,
@@ -284,9 +285,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define REQUEST_TOTAL_EXEC_TIMES 2
-#define IS_SYS_DBNAME(_dbname) \
- (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || \
- ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
+#define IS_INFORMATION_SCHEMA_DB(_name) ((*(_name) == 'i') && (0 == strcmp(_name, TSDB_INFORMATION_SCHEMA_DB)))
+#define IS_PERFORMANCE_SCHEMA_DB(_name) ((*(_name) == 'p') && (0 == strcmp(_name, TSDB_PERFORMANCE_SCHEMA_DB)))
+
+#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
#define qFatal(...) \
do { \
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index 1eed342f8c..014ed518a3 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -66,6 +66,7 @@ typedef struct {
int64_t commitVer;
int64_t appliedVer;
int64_t lastVer;
+ int64_t logRetention;
} SWalVer;
#pragma pack(push, 1)
@@ -126,7 +127,7 @@ typedef struct SWal {
typedef struct {
int64_t refId;
int64_t refVer;
- int64_t refFile;
+// int64_t refFile;
SWal *pWal;
} SWalRef;
@@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
-int32_t walBeginSnapshot(SWal *, int64_t ver);
+int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
int32_t walEndSnapshot(SWal *);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// for tq
diff --git a/packaging/docker/DockerfileCloud b/packaging/docker/DockerfileCloud
deleted file mode 100644
index fa8fcabf34..0000000000
--- a/packaging/docker/DockerfileCloud
+++ /dev/null
@@ -1,30 +0,0 @@
-FROM ubuntu:18.04
-
-WORKDIR /root
-
-ARG pkgFile
-ARG dirName
-ARG cpuType
-RUN echo ${pkgFile} && echo ${dirName}
-
-RUN apt update
-RUN apt install -y curl
-
-COPY ${pkgFile} /root/
-ENV TINI_VERSION v0.19.0
-ENV TAOS_DISABLE_ADAPTER 1
-ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
-ENV DEBIAN_FRONTEND=noninteractive
-WORKDIR /root/
-RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
-
-ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
- LC_CTYPE=en_US.UTF-8 \
- LANG=en_US.UTF-8 \
- LC_ALL=en_US.UTF-8
-COPY ./run.sh /usr/bin/
-COPY ./bin/* /usr/bin/
-
-ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
-CMD ["bash", "-c", "/usr/bin/run.sh"]
-VOLUME [ "/var/lib/taos", "/var/log/taos" ]
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index b01a871702..5c4bcf7946 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -49,6 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
return TSDB_CODE_SUCCESS;
}
+static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
+ int32_t code = 0;
+ SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
+ if (NULL == vgInfo) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ return code;
+ }
+
+ vgInfo->vgVersion = rsp->vgVersion;
+ vgInfo->stateTs = rsp->stateTs;
+ vgInfo->hashMethod = rsp->hashMethod;
+ vgInfo->hashPrefix = rsp->hashPrefix;
+ vgInfo->hashSuffix = rsp->hashSuffix;
+ vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
+ if (NULL == vgInfo->vgHash) {
+ taosMemoryFree(vgInfo);
+ tscError("hash init[%d] failed", rsp->vgNum);
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _return;
+ }
+
+ for (int32_t j = 0; j < rsp->vgNum; ++j) {
+ SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
+ if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
+ tscError("hash push failed, errno:%d", errno);
+ taosHashCleanup(vgInfo->vgHash);
+ taosMemoryFree(vgInfo);
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _return;
+ }
+ }
+
+_return:
+ if (code) {
+ taosHashCleanup(vgInfo->vgHash);
+ taosMemoryFreeClear(vgInfo);
+ }
+
+ *pInfo = vgInfo;
+ return code;
+}
+
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
@@ -67,37 +109,22 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
- SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
- if (NULL == vgInfo) {
- code = TSDB_CODE_OUT_OF_MEMORY;
+ SDBVgInfo *vgInfo = NULL;
+ code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
+ if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
- vgInfo->vgVersion = rsp->vgVersion;
- vgInfo->stateTs = rsp->stateTs;
- vgInfo->hashMethod = rsp->hashMethod;
- vgInfo->hashPrefix = rsp->hashPrefix;
- vgInfo->hashSuffix = rsp->hashSuffix;
- vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
- if (NULL == vgInfo->vgHash) {
- taosMemoryFree(vgInfo);
- tscError("hash init[%d] failed", rsp->vgNum);
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _return;
- }
-
- for (int32_t j = 0; j < rsp->vgNum; ++j) {
- SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
- if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
- tscError("hash push failed, errno:%d", errno);
- taosHashCleanup(vgInfo->vgHash);
- taosMemoryFree(vgInfo);
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _return;
- }
- }
-
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
+
+ if (IS_SYS_DBNAME(rsp->db)) {
+ code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
+ if (TSDB_CODE_SUCCESS != code) {
+ goto _return;
+ }
+
+ catalogUpdateDBVgInfo(pCatalog, (rsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->uid, vgInfo);
+ }
}
if (code) {
@@ -492,6 +519,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
for (int32_t i = 0; i < dbNum; ++i) {
SDbVgVersion *db = &dbs[i];
+ tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
+ i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
+
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->numOfTable = htonl(db->numOfTable);
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index 554f4ee4c3..92d03103d7 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -163,6 +163,22 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pEpSet);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
+ } else {
+ struct SCatalog* pCatalog = NULL;
+ int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
+ if (TSDB_CODE_SUCCESS == code) {
+ STscObj* pTscObj = pRequest->pTscObj;
+
+ SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ char dbFName[TSDB_DB_FNAME_LEN];
+ snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
+ catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
+ snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
+ catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
+ }
}
if (pRequest->body.queryFp) {
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 85dccbca83..01a1aa073b 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -98,6 +98,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
while (newSize < pAttr->length + dataLen) {
newSize = newSize * 1.5;
+ if (newSize > UINT32_MAX) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
}
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index e3f08e912a..c8f1efa1ab 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -61,7 +61,7 @@ int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000;
// vnode
-int64_t tsVndCommitMaxIntervalMs = 60 * 1000;
+int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
// monitor
bool tsEnableMonitor = true;
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index e7d75312e4..ff131b50a6 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -672,7 +672,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} else {
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
char *newTopic = taosArrayGetP(pTopicList, j);
- int comp = compareLenPrefixedStr(oldTopic, newTopic);
+ int comp = strcmp(oldTopic, newTopic);
if (comp == 0) {
i++;
j++;
diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c
index c2d7a9757a..2d4b7a1e56 100644
--- a/source/dnode/mnode/sdb/src/sdbFile.c
+++ b/source/dnode/mnode/sdb/src/sdbFile.c
@@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) {
- // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
+ // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0);
if (pSdb->sync == 0) {
code = 0;
} else {
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index f31cf97cf9..4f7f9d14eb 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1067,6 +1067,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
if (code < 0) return code;
#endif
+ if (tsDisableStream) {
+ return 0;
+ }
// 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c
index 841dc29731..631652f7fa 100644
--- a/source/dnode/vnode/src/tq/tqRead.c
+++ b/source/dnode/vnode/src/tq/tqRead.c
@@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
- ASSERT(pReader->pWalReader->curInvalid);
- ASSERT(pReader->pWalReader->curVersion == ver);
return -1;
}
- ASSERT(pReader->pWalReader->curVersion == ver);
return 0;
}
@@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
- if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
- while (true) {
- if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
- if (pReader->pBlock == NULL) break;
- }
+// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
+// while (true) {
+// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
+// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
+// pReader->msgIter.len, pReader->msgIter.uid);
+// if (pReader->pBlock == NULL) break;
+// }
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver;
diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c
index 83a414dae0..88abc1b3f0 100644
--- a/source/dnode/vnode/src/vnd/vnodeBufPool.c
+++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c
@@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
int vnodeCloseBufPool(SVnode *pVnode) {
SVBufPool *pPool;
+ taosThreadMutexLock(&pVnode->mutex);
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
pVnode->pPool = pPool->next;
vnodeBufPoolDestroy(pPool);
@@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pVnode->inUse);
pVnode->inUse = NULL;
}
- vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
+ taosThreadMutexUnlock(&pVnode->mutex);
+ vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0;
}
@@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
pVnode->pPool = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
+ if (pVnode->inUse == pPool) {
+ pVnode->inUse = NULL;
+ }
taosThreadMutexUnlock(&pVnode->mutex);
}
}
diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c
index cefa9e6755..e8280ea751 100644
--- a/source/dnode/vnode/src/vnd/vnodeCommit.c
+++ b/source/dnode/vnode/src/vnd/vnodeCommit.c
@@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
}
int vnodeShouldCommit(SVnode *pVnode) {
- if (!pVnode->inUse || !osDataSpaceAvailable()) {
- return false;
- }
-
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
+ bool diskAvail = osDataSpaceAvailable();
+ bool needCommit = false;
- return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
- (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
-}
-
-int vnodeShouldCommitOld(SVnode *pVnode) {
- if (pVnode->inUse) {
- return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
+ taosThreadMutexLock(&pVnode->mutex);
+ if (!pVnode->inUse || !diskAvail) {
+ goto _out;
}
- return false;
+ needCommit =
+ (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
+ (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
+_out:
+ taosThreadMutexUnlock(&pVnode->mutex);
+ return needCommit;
}
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
@@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
metaPrepareAsyncCommit(pVnode->pMeta);
vnodeBufPoolUnRef(pVnode->inUse);
- pVnode->inUse = NULL;
_exit:
if (code) {
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 172696571b..a04da867e2 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -869,13 +869,14 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataSetVal(pColInfo, i, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
- char* tmp = alloca(tagVal.nData + VARSTR_HEADER_SIZE + 1);
+ char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataSetVal(pColInfo, i, tmp, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter varch:%s", tmp + 2);
#endif
+ taosMemoryFree(tmp);
} else {
colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
#if TAG_FILTER_DEBUG
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 19c9c6f97d..82f079e2fb 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
-
+ qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
ASSERT(pInfo->validBlockIndex == 0);
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
@@ -1047,18 +1047,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
-#if 0
- if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
- pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
- qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
- pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
- ASSERT(0);
- }
-#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1;
}
- ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c
index 091b10a63e..a1426e2a96 100644
--- a/source/libs/executor/src/filloperator.c
+++ b/source/libs/executor/src/filloperator.c
@@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
SWinKey key = {.groupId = groupId, .ts = pRow->key};
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
+ qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
ASSERT(code == TSDB_CODE_SUCCESS);
}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 161fe52367..aa61d24b92 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -1064,11 +1064,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
- if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
- longjmp(pTaskInfo->env, pTaskInfo->code);
- }
}
taosHashClear(pInfo->pPartitions);
doStreamHashPartitionImpl(pInfo, pBlock);
diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c
index 3c13e10ee7..31ff11eec5 100644
--- a/source/libs/executor/src/joinoperator.c
+++ b/source/libs/executor/src/joinoperator.c
@@ -24,6 +24,17 @@
#include "tmsg.h"
#include "ttypes.h"
+typedef struct SJoinRowCtx {
+ bool rowRemains;
+ int64_t ts;
+ SArray* leftRowLocations;
+ SArray* rightRowLocations;
+ SArray* leftCreatedBlocks;
+ SArray* rightCreatedBlocks;
+ int32_t leftRowIdx;
+ int32_t rightRowIdx;
+} SJoinRowCtx;
+
typedef struct SJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
@@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
+
+ SJoinRowCtx rowCtx;
} SJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
@@ -287,49 +300,107 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
- SJoinOperatorInfo* pJoinInfo = pOperator->info;
- SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
- SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
-
- SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
- SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
- mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
- pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
- mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
- pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
+ SJoinOperatorInfo* pJoinInfo = pOperator->info;
+ SArray* leftRowLocations = NULL;
+ SArray* leftCreatedBlocks = NULL;
+ SArray* rightRowLocations = NULL;
+ SArray* rightCreatedBlocks = NULL;
+ int32_t leftRowIdx = 0;
+ int32_t rightRowIdx = 0;
+ int32_t i, j;
+
+ if (pJoinInfo->rowCtx.rowRemains) {
+ leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
+ leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
+ rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
+ rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
+ leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
+ rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
+ } else {
+ leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
+ leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
+ rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
+ rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
+
+ mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
+ pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
+ mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
+ pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
+ }
+
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
- code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
+ uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
+ uint32_t limitRowNum = maxRowNum;
+ if (maxRowNum > pOperator->resultInfo.threshold) {
+ limitRowNum = pOperator->resultInfo.threshold;
+ if (!pJoinInfo->rowCtx.rowRemains) {
+ pJoinInfo->rowCtx.rowRemains = true;
+ pJoinInfo->rowCtx.ts = timestamp;
+ pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
+ pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
+ pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
+ pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
+ }
+ }
+
+ code = blockDataEnsureCapacity(pRes, limitRowNum);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin);
}
+
+
if (code == TSDB_CODE_SUCCESS) {
- for (int32_t i = 0; i < leftNumJoin; ++i) {
- for (int32_t j = 0; j < rightNumJoin; ++j) {
+ bool done = false;
+ for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
+ for (j = rightRowIdx; j < rightNumJoin; ++j) {
+ if (*nRows >= limitRowNum) {
+ done = true;
+ break;
+ }
+
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
+ if (done) {
+ break;
+ }
+ }
+
+ if (maxRowNum > pOperator->resultInfo.threshold) {
+ pJoinInfo->rowCtx.leftRowIdx = i;
+ pJoinInfo->rowCtx.rightRowIdx = j;
}
}
- for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
- SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
- blockDataDestroy(pBlock);
+ if (maxRowNum <= pOperator->resultInfo.threshold) {
+ for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
+ blockDataDestroy(pBlock);
+ }
+ taosArrayDestroy(rightCreatedBlocks);
+ taosArrayDestroy(rightRowLocations);
+ for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
+ blockDataDestroy(pBlock);
+ }
+ taosArrayDestroy(leftCreatedBlocks);
+ taosArrayDestroy(leftRowLocations);
+
+ if (pJoinInfo->rowCtx.rowRemains) {
+ pJoinInfo->rowCtx.rowRemains = false;
+ pJoinInfo->rowCtx.leftRowLocations = NULL;
+ pJoinInfo->rowCtx.rightRowLocations = NULL;
+ pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
+ pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
+ }
}
- taosArrayDestroy(rightCreatedBlocks);
- taosArrayDestroy(rightRowLocations);
- for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
- SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
- blockDataDestroy(pBlock);
- }
- taosArrayDestroy(leftCreatedBlocks);
- taosArrayDestroy(leftRowLocations);
return TSDB_CODE_SUCCESS;
}
@@ -379,9 +450,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
- bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
- if (!hasNextTs) {
- break;
+ if (pJoinInfo->rowCtx.rowRemains) {
+ leftTs = pJoinInfo->rowCtx.ts;
+ rightTs = pJoinInfo->rowCtx.ts;
+ } else {
+ bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
+ if (!hasNextTs) {
+ break;
+ }
}
if (leftTs == rightTs) {
@@ -389,12 +465,12 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1;
- if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
+ if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1;
- if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
+ if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
}
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 99281e6c59..71e1068fb3 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
}
- ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
} else {
return NULL;
}
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 54be30028e..62d68d5ca2 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -1538,16 +1538,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS) {
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
- qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
+ qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey,
tw.ekey, tmpKey.groupId, mark);
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
- qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
+ qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
- qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
+ qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
streamStateFreeCur(pCur);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 0d052846f7..8b3cfd105b 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -5012,14 +5012,24 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
}
- if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType &&
- pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
- return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
+ if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
+ if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+ }
+
+ if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
+ }
}
- if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType &&
- tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
- return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
+ if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType) {
+ if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+ }
+
+ if (tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
+ }
}
}
diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c
index 25e65d2588..d91b2ebd6b 100644
--- a/source/libs/scalar/src/filter.c
+++ b/source/libs/scalar/src/filter.c
@@ -3120,9 +3120,8 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
+ p[i] = colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
@@ -3146,9 +3145,8 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
+ p[i] = !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
@@ -3178,13 +3176,13 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe
for (int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData *pData = info->cunits[0].colData;
- void *colData = colDataGetData(pData, i);
- if (colData == NULL || colDataIsNull_s(pData, i)) {
+ if (colDataIsNull_s(pData, i)) {
all = false;
p[i] = 0;
continue;
}
+ void *colData = colDataGetData(pData, i);
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
if (p[i] == 0) {
@@ -3210,13 +3208,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
+
+ if (colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
p[i] = 0;
all = false;
continue;
}
+ void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
// match/nmatch for nchar type need convert from ucs4 to mbs
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
(info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) {
@@ -3274,7 +3273,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SC
if (!isNull) {
colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
}
-
+
if (colData == NULL || isNull) {
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
} else {
diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h
index 504a9f0bd7..a823cfda0b 100644
--- a/source/libs/sync/inc/syncPipeline.h
+++ b/source/libs/sync/inc/syncPipeline.h
@@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
+bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 3f0432d998..fc0d235df9 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -270,88 +270,40 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
return -1;
}
+ SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
+ SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
+ bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
+
+ if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
+ sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
+ syncNodeRelease(pSyncNode);
+ return 0;
+ }
+
int32_t code = 0;
+ int64_t logRetention = 0;
if (syncNodeIsMnode(pSyncNode)) {
// mnode
- int64_t logRetention = SYNC_MNODE_LOG_RETENTION;
-
- SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
- SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
- int64_t logNum = endIndex - beginIndex;
- bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
-
- if (isEmpty || (!isEmpty && logNum < logRetention)) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
- lastApplyIndex, logNum, isEmpty);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- goto _DEL_WAL;
-
+ logRetention = SYNC_MNODE_LOG_RETENTION;
} else {
- SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
- SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
- bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
-
- if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
// vnode
if (pSyncNode->replicaNum > 1) {
// multi replicas
-
- lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
-
- if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
- pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
-
- for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
- int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
- if (lastApplyIndex > matchIndex) {
- sNTrace(pSyncNode,
- "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
- " of dnode:%d, do not delete wal",
- lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
-
- syncNodeRelease(pSyncNode);
- return 0;
- }
- }
-
- } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
- if (lastApplyIndex > pSyncNode->minMatchIndex) {
- sNTrace(pSyncNode,
- "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
- lastApplyIndex, pSyncNode->minMatchIndex);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
- syncNodeRelease(pSyncNode);
- return 0;
-
- } else {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- goto _DEL_WAL;
-
- } else {
- // one replica
-
- goto _DEL_WAL;
+ logRetention = SYNC_VNODE_LOG_RETENTION;
}
}
+ if (pSyncNode->replicaNum > 1) {
+ if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
+ sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
+ lastApplyIndex);
+ syncNodeRelease(pSyncNode);
+ return 0;
+ }
+ logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
+ }
+
_DEL_WAL:
do {
@@ -366,7 +318,7 @@ _DEL_WAL:
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
pSyncNode->snapshottingTime = taosGetTimestampMs();
- code = walBeginSnapshot(pData->pWal, lastApplyIndex);
+ code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
if (code == 0) {
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
pSyncNode->snapshottingIndex, lastApplyIndex);
@@ -2142,24 +2094,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if (timerLogicClock == msgLogicClock) {
if (tsNow > pData->execTime) {
-#if 0
- sTrace(
- "vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
- "---------",
- pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
-#endif
-
pData->execTime += pSyncTimer->timerMS;
SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
+ pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
+
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
pSyncMsg->commitIndex = pSyncNode->commitIndex;
- pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
+ pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = tsNow;
@@ -2171,11 +2118,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else {
-#if 0
- sTrace(
- "vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
- pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
-#endif
}
if (syncIsInit()) {
@@ -2468,6 +2410,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
+ if (syncLogBufferIsEmpty(ths->pLogBuf)) {
+ sError("vgId:%d, sync log buffer is empty.", ths->vgId);
+ return 0;
+ }
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c
index c9ff2d2dcc..e2b039a2e4 100644
--- a/source/libs/sync/src/syncPipeline.c
+++ b/source/libs/sync/src/syncPipeline.c
@@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
+ syncLogBufferValidate(pBuf);
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) continue;
@@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
if (ret < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
}
+ syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
}
@@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
return term;
}
+bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
+ taosThreadMutexLock(&pBuf->mutex);
+ bool empty = (pBuf->endIndex <= pBuf->startIndex);
+ taosThreadMutexUnlock(&pBuf->mutex);
+ return empty;
+}
+
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
@@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
+ syncLogBufferValidate(pBuf);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer == pBuf->matchIndex);
SyncIndex index = pBuf->endIndex - 1;
@@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogReplMgrReset(pMgr);
}
+ syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}
diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c
index 4f0682a617..bf8c5c53dc 100644
--- a/source/libs/tdb/src/db/tdbBtree.c
+++ b/source/libs/tdb/src/db/tdbBtree.c
@@ -253,7 +253,7 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
}
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
- SBTC btc;
+ SBTC btc = {0};
int c;
int ret;
@@ -264,11 +264,18 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
// move the cursor
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) {
- ASSERT(0);
+ tdbError("tdb/btree-upsert: btc move to failed with ret: %d.", ret);
+ if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
+ tdbFree(btc.coder.pKey);
+ }
tdbBtcClose(&btc);
return -1;
}
+ if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
+ tdbFree(btc.coder.pKey);
+ }
+
if (btc.idx == -1) {
btc.idx = 0;
c = 1;
@@ -280,8 +287,8 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
if (ret < 0) {
- ASSERT(0);
tdbBtcClose(&btc);
+ tdbError("tdb/btree-upsert: btc upsert failed with ret: %d.", ret);
return -1;
}
@@ -1428,15 +1435,19 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
// Clear the state of decoder
if (TDB_CELLDECODER_FREE_VAL(pDecoder)) {
tdbFree(pDecoder->pVal);
+ TDB_CELLDECODER_CLZ_FREE_VAL(pDecoder);
+ // tdbTrace("tdb btc decoder val set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
+ }
+ if (TDB_CELLDECODER_FREE_KEY(pDecoder)) {
+ tdbFree(pDecoder->pKey);
+ TDB_CELLDECODER_CLZ_FREE_KEY(pDecoder);
+ // tdbTrace("tdb btc decoder key set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
}
pDecoder->kLen = -1;
pDecoder->pKey = NULL;
pDecoder->vLen = -1;
pDecoder->pVal = NULL;
pDecoder->pgno = 0;
- TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
-
- // tdbTrace("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
// 1. Decode header part
if (!leaf) {
@@ -2188,7 +2199,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} else {
lidx = lidx + 1;
}
-
// compare last cell
if (lidx <= ridx) {
pBtc->idx = ridx;
diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h
index 62466e9c47..7a0bcc00a4 100644
--- a/source/libs/tdb/src/inc/tdbInt.h
+++ b/source/libs/tdb/src/inc/tdbInt.h
@@ -122,6 +122,8 @@ typedef struct SBtInfo {
#define TDB_CELLD_F_VAL 0x2
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
+#define TDB_CELLDECODER_CLZ_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_KEY)
+#define TDB_CELLDECODER_CLZ_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_VAL)
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
index a41cc0068c..5ff67c87ca 100644
--- a/source/libs/transport/inc/transComm.h
+++ b/source/libs/transport/inc/transComm.h
@@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "taoserror.h"
#include "theap.h"
+#include "tmisce.h"
#include "transLog.h"
#include "transportInt.h"
#include "trpc.h"
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index 5d6751a260..38189f90db 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -11,7 +11,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-
#include "transComm.h"
typedef struct SConnList {
@@ -224,9 +223,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0);
// snprintf may cause performance problem
-#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
- do { \
- snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
+#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
+ do { \
+ char* p = key; \
+ int32_t len = strlen(ip); \
+ if (p != NULL) memcpy(p, ip, len); \
+ p[len] = ':'; \
+ titoa(port, 10, &p[len + 1]); \
} while (0)
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
@@ -664,7 +667,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
- tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
+ tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
@@ -677,7 +680,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
- tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
+ tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
cliHandleExcept(conn);
break;
@@ -1949,11 +1952,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
- STraceId* trace = &pMsg->msg.info.traceId;
- char tbuf[256] = {0};
- EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
- tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
- pCtx->retryStep, pCtx->retryNextInterval);
+ if (rpcDebugFlag & DEBUG_DEBUG) {
+ STraceId* trace = &pMsg->msg.info.traceId;
+ char tbuf[256] = {0};
+ EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
+ tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
+ pCtx->retryStep, pCtx->retryNextInterval);
+ }
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
@@ -1990,7 +1995,7 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
pResp->pCont = buf;
pResp->contLen = len;
- *dst = epset;
+ epsetAssign(dst, &epset);
return true;
}
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
@@ -2015,7 +2020,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
- pCtx->epSet = epSet;
+ epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
@@ -2040,7 +2045,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
- pCtx->epSet = epSet;
+ epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
@@ -2130,10 +2135,6 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
pCtx->retryNextInterval = pCtx->retryMaxInterval;
}
-
- // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
- // return false;
- // }
} else {
pCtx->retryNextInterval = 0;
pCtx->epsetRetryCnt++;
@@ -2181,9 +2182,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {
- char tbuf[256] = {0};
- EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
- tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
+ if (rpcDebugFlag & DEBUG_TRACE) {
+ char tbuf[256] = {0};
+ EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
+ tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
+ }
}
if (pCtx->pSem != NULL) {
@@ -2310,8 +2313,9 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
- pCtx->epSet = *pEpSet;
- pCtx->origEpSet = *pEpSet;
+ epsetAssign(&pCtx->epSet, pEpSet);
+ epsetAssign(&pCtx->origEpSet, pEpSet);
+
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
@@ -2356,8 +2360,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
- pCtx->epSet = *pEpSet;
- pCtx->origEpSet = *pEpSet;
+ epsetAssign(&pCtx->epSet, pEpSet);
+ epsetAssign(&pCtx->origEpSet, pEpSet);
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->pSem = sem;
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index 5e09af5b2e..526dba0bb5 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (walSkipFetchBodyNew(pReader) < 0) {
return -1;
}
- fetchVer++;
- ASSERT(fetchVer == pReader->curVersion);
+ fetchVer = pReader->curVersion;
}
}
pReader->curStopped = 1;
@@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
}
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
- char fnameStr[WAL_FILE_LEN];
+ char fnameStr[WAL_FILE_LEN] = {0};
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
@@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
return -1;
}
- if (pReadHead->version != ver) {
- wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
- pRead->pHead->head.version, ver);
- pRead->curInvalid = 1;
- terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
- return -1;
- }
-
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1;
diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c
index 43470f4c82..768256cefa 100644
--- a/source/libs/wal/src/walRef.c
+++ b/source/libs/wal/src/walRef.c
@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
}
pRef->refId = tGenIdPI64();
pRef->refVer = -1;
- pRef->refFile = -1;
+// pRef->refFile = -1;
pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef;
@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver;
// bsearch in fileSet
- SWalFileInfo tmpInfo;
- tmpInfo.firstVer = ver;
- SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- ASSERT(pRet != NULL);
- pRef->refFile = pRet->firstVer;
+// SWalFileInfo tmpInfo;
+// tmpInfo.firstVer = ver;
+// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+// ASSERT(pRet != NULL);
+// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
}
@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
#if 1
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
- pRef->refFile = -1;
+// pRef->refFile = -1;
}
#endif
@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
}
}
taosThreadMutexLock(&pWal->mutex);
-
int64_t ver = walGetFirstVer(pWal);
-
- wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
-
pRef->refVer = ver;
// bsearch in fileSet
- SWalFileInfo tmpInfo;
- tmpInfo.firstVer = ver;
- SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- ASSERT(pRet != NULL);
- pRef->refFile = pRet->firstVer;
+// SWalFileInfo tmpInfo;
+// tmpInfo.firstVer = ver;
+// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+// ASSERT(pRet != NULL);
+// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
+ wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
+
return pRef;
}
@@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
- pRef->refFile = pRet->firstVer;
+// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 96c77d0971..b38961709e 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
return 0;
}
-int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
+int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
taosThreadMutexLock(&pWal->mutex);
-
+ ASSERT(logRetention >= 0);
pWal->vers.verInSnapshotting = ver;
- wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
- pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
+ pWal->vers.logRetention = logRetention;
+
+ wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
+ ", last ver %" PRId64,
+ pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling
- if (pWal->cfg.retentionPeriod == 0) {
- if (walGetLastFileSize(pWal) != 0) {
- if (walRollImpl(pWal) < 0) {
- wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
- goto _err;
- }
+ if (walGetLastFileSize(pWal) != 0) {
+ if (walRollImpl(pWal) < 0) {
+ wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
+ goto _err;
}
}
+
taosThreadMutexUnlock(&pWal->mutex);
return 0;
@@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting;
- wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
- ver, pWal->vers.firstVer, pWal->vers.lastVer);
+ wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
+ ", last ver %" PRId64,
+ pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
if (ver == -1) {
code = -1;
@@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
+ ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index 891e7dcdae..0784db917a 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
if (i == 5) {
- walBeginSnapshot(pWal, i);
+ walBeginSnapshot(pWal, i, 0);
walEndSnapshot(pWal);
}
}
@@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- walBeginSnapshot(pWal, i - 1);
+ walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
walEndSnapshot(pWal);
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
@@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- code = walBeginSnapshot(pWal, i - 1);
+ code = walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(code, 0);
code = walEndSnapshot(pWal);
ASSERT_EQ(code, 0);
diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c
index 2f947d3252..1f2df09ce1 100644
--- a/source/os/src/osSemaphore.c
+++ b/source/os/src/osSemaphore.c
@@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) {
int tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
- dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
+ dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
+ dispatch_semaphore_wait(*psem, time);
return 0;
}
diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c
index e37ef5b395..07b3084222 100644
--- a/source/util/src/tarray.c
+++ b/source/util/src/tarray.c
@@ -139,7 +139,8 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
taosArraySet(pArray, pos + 1, p2);
- pos += 1;
+ memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
+ pos += 1;
} else {
pos += 1;
}
@@ -171,13 +172,14 @@ void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp
// do nothing
} else {
if (pos + 1 != i) {
- void* p = taosArrayGet(pArray, pos + 1);
+ void* p = taosArrayGetP(pArray, pos + 1);
if (fp != NULL) {
fp(p);
}
taosArraySet(pArray, pos + 1, p2);
- pos += 1;
+ memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
+ pos += 1;
} else {
pos += 1;
}
diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c
index 7a52991e81..46a9051436 100644
--- a/source/util/src/tcompare.c
+++ b/source/util/src/tcompare.c
@@ -1232,7 +1232,7 @@ int32_t taosArrayCompareString(const void *a, const void *b) {
const char *x = *(const char **)a;
const char *y = *(const char **)b;
- return compareLenPrefixedStr(x, y);
+ return strcmp(x, y);
}
int32_t comparestrPatternMatch(const void *pLeft, const void *pRight) {
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index c07fa88af5..b85035ffcf 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -514,7 +514,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp")
-TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column length")
+TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column/tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 16751423b1..188ab86944 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -179,6 +179,7 @@
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim
+,,n,script,./test.sh -f tsim/query/join.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
@@ -646,6 +647,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py
+,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh
index 5c7976a9fc..2bd4eaa548 100755
--- a/tests/script/sh/checkAsan.sh
+++ b/tests/script/sh/checkAsan.sh
@@ -40,7 +40,7 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# /root/TDengine/source/common/src/tdataformat.c:1876:7: runtime error: signed integer overflow: 8252423483843671206 + 2406154664059062870 cannot be represented in type 'long int'
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
-runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14"|wc -l`
+runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
diff --git a/tests/script/tsim/catalog/alterInCurrent.sim b/tests/script/tsim/catalog/alterInCurrent.sim
index 3cb337bbe1..521858c368 100644
--- a/tests/script/tsim/catalog/alterInCurrent.sim
+++ b/tests/script/tsim/catalog/alterInCurrent.sim
@@ -67,4 +67,19 @@ sql insert into t1 values (1591060628000, 1);
sql alter table st1 drop tag t2;
sql create table t2 using st1 tags(2);
+print ======== drop tag in super table
+sql create database if not exists aaa;
+sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'aaa';
+if $rows != 0 then
+ return -1
+endi
+sql drop database if exists foo;
+sql create database if not exists foo;
+sql create table foo.t(ts timestamp,name varchar(20));
+sql create table foo.xt(ts timestamp,name varchar(20));
+sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'foo';
+if $rows != 2 then
+ return -1
+endi
+
system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/script/tsim/query/join.sim b/tests/script/tsim/query/join.sim
new file mode 100644
index 0000000000..adb0338ef7
--- /dev/null
+++ b/tests/script/tsim/query/join.sim
@@ -0,0 +1,72 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+sql connect
+
+$dbPrefix = db
+$tbPrefix1 = tba
+$tbPrefix2 = tbb
+$mtPrefix = stb
+$tbNum = 10000
+$rowNum = 2
+
+print =============== step1
+$i = 0
+$db = $dbPrefix . $i
+$mt1 = $mtPrefix . $i
+$i = 1
+$mt2 = $mtPrefix . $i
+
+sql drop database $db -x step1
+step1:
+sql create database $db
+sql use $db
+sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
+sql create table $mt2 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
+
+print ====== start create child tables and insert data
+$i = 0
+while $i < $tbNum
+ $tb = $tbPrefix1 . $i
+ sql create table $tb using $mt1 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
+
+ $x = 0
+ while $x < $rowNum
+ $cc = $x * 60000
+ $ms = 1601481600000 + $cc
+
+ sql insert into $tb values ($ms , $x )
+ $x = $x + 1
+ endw
+
+ $i = $i + 1
+endw
+
+print =============== step2
+$i = 0
+while $i < $tbNum
+ $tb = $tbPrefix2 . $i
+ sql create table $tb using $mt2 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
+
+ $x = 0
+ while $x < $rowNum
+ $cc = $x * 60000
+ $ms = 1601481600000 + $cc
+
+ sql insert into $tb values ($ms , $x )
+ $x = $x + 1
+ endw
+
+ $i = $i + 1
+endw
+
+sql select * from tba0 t1, tbb0 t2 where t1.ts=t2.ts;
+if $rows != 2 then
+ return -1
+endi
+sql select * from stb0 t1, stb1 t2 where t1.ts=t2.ts and t1.tag2=t2.tag2;
+if $rows != 200000000 then
+ return -1
+endi
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/system-test/2-query/tagFilter.py b/tests/system-test/2-query/tagFilter.py
new file mode 100644
index 0000000000..b03776c31b
--- /dev/null
+++ b/tests/system-test/2-query/tagFilter.py
@@ -0,0 +1,67 @@
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import *
+
+
+INT_COL = "c1"
+BINT_COL = "c2"
+SINT_COL = "c3"
+TINT_COL = "c4"
+FLOAT_COL = "c5"
+DOUBLE_COL = "c6"
+BOOL_COL = "c7"
+
+BINARY_COL = "c8"
+NCHAR_COL = "c9"
+TS_COL = "c10"
+
+NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
+UN_NUM_COL = [BOOL_COL, BINARY_COL, NCHAR_COL, ]
+TS_TYPE_COL = [TS_COL]
+
+DBNAME = "db"
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor())
+ tdSql.execute(f'drop database if exists db')
+ tdSql.execute(f'create database if not exists db vgroups 1')
+
+ def __create_tb(self, dbname="db"):
+ create_stb_sql = f'''create table {dbname}.stb1(
+ ts timestamp, f1 int
+ ) tags (tag1 binary(16300))
+ '''
+ tdSql.execute(create_stb_sql)
+
+ tag_value = 'a'
+ for i in range(1200):
+ tag_value = tag_value + 'a'
+
+ for i in range(8000):
+ tdSql.execute(f"create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( '{tag_value}' )")
+
+ def __query_data(self, rows, dbname="db"):
+ tdSql.execute(
+ f'''select count(*) from {dbname}.stb1 where tag1 like '%a'
+ '''
+ )
+ tdSql.checkRows(0)
+
+ def run(self):
+ tdLog.printNoPrefix("==========step1:create table")
+ self.__create_tb()
+
+ tdLog.printNoPrefix("==========step2:query data")
+ self.__query_data(10)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py
index 0596241ce1..54bfab1ebc 100644
--- a/tests/system-test/7-tmq/tmq_taosx.py
+++ b/tests/system-test/7-tmq/tmq_taosx.py
@@ -195,6 +195,7 @@ class TDTestCase:
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
+ time.sleep(10)
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
uid1 = tdSql.getData(0, 5)
uid2 = tdSql.getData(1, 5)