other: merge main
This commit is contained in:
commit
514369c611
|
@ -2,7 +2,7 @@
|
||||||
IF (DEFINED VERNUMBER)
|
IF (DEFINED VERNUMBER)
|
||||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(TD_VER_NUMBER "3.0.2.5")
|
SET(TD_VER_NUMBER "3.0.2.6")
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF (DEFINED VERCOMPATIBLE)
|
IF (DEFINED VERCOMPATIBLE)
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG 61cbfd2
|
GIT_TAG 1e15545
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -61,7 +61,7 @@ static int32_t init_env() {
|
||||||
printf("create database\n");
|
printf("create database\n");
|
||||||
pRes = taos_query(pConn, "drop topic topicname");
|
pRes = taos_query(pConn, "drop topic topicname");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,9 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_SYSTABLE_H
|
||||||
|
#define TDENGINE_SYSTABLE_H
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -19,9 +22,6 @@ extern "C" {
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#ifndef TDENGINE_SYSTABLE_H
|
|
||||||
#define TDENGINE_SYSTABLE_H
|
|
||||||
|
|
||||||
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
|
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
|
||||||
#define TSDB_INS_TABLE_DNODES "ins_dnodes"
|
#define TSDB_INS_TABLE_DNODES "ins_dnodes"
|
||||||
#define TSDB_INS_TABLE_MNODES "ins_mnodes"
|
#define TSDB_INS_TABLE_MNODES "ins_mnodes"
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
#include "systable.h"
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
JOB_TASK_STATUS_NULL = 0,
|
JOB_TASK_STATUS_NULL = 0,
|
||||||
|
@ -284,9 +285,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
|
|
||||||
#define REQUEST_TOTAL_EXEC_TIMES 2
|
#define REQUEST_TOTAL_EXEC_TIMES 2
|
||||||
|
|
||||||
#define IS_SYS_DBNAME(_dbname) \
|
#define IS_INFORMATION_SCHEMA_DB(_name) ((*(_name) == 'i') && (0 == strcmp(_name, TSDB_INFORMATION_SCHEMA_DB)))
|
||||||
(((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || \
|
#define IS_PERFORMANCE_SCHEMA_DB(_name) ((*(_name) == 'p') && (0 == strcmp(_name, TSDB_PERFORMANCE_SCHEMA_DB)))
|
||||||
((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
|
|
||||||
|
#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
|
||||||
|
|
||||||
#define qFatal(...) \
|
#define qFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -66,6 +66,7 @@ typedef struct {
|
||||||
int64_t commitVer;
|
int64_t commitVer;
|
||||||
int64_t appliedVer;
|
int64_t appliedVer;
|
||||||
int64_t lastVer;
|
int64_t lastVer;
|
||||||
|
int64_t logRetention;
|
||||||
} SWalVer;
|
} SWalVer;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
@ -126,7 +127,7 @@ typedef struct SWal {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
int64_t refVer;
|
int64_t refVer;
|
||||||
int64_t refFile;
|
// int64_t refFile;
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
} SWalRef;
|
} SWalRef;
|
||||||
|
|
||||||
|
@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
|
||||||
int32_t walCommit(SWal *, int64_t ver);
|
int32_t walCommit(SWal *, int64_t ver);
|
||||||
int32_t walRollback(SWal *, int64_t ver);
|
int32_t walRollback(SWal *, int64_t ver);
|
||||||
// notify that previous logs can be pruned safely
|
// notify that previous logs can be pruned safely
|
||||||
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
|
||||||
int32_t walEndSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
// for tq
|
// for tq
|
||||||
|
|
|
@ -1,30 +0,0 @@
|
||||||
FROM ubuntu:18.04
|
|
||||||
|
|
||||||
WORKDIR /root
|
|
||||||
|
|
||||||
ARG pkgFile
|
|
||||||
ARG dirName
|
|
||||||
ARG cpuType
|
|
||||||
RUN echo ${pkgFile} && echo ${dirName}
|
|
||||||
|
|
||||||
RUN apt update
|
|
||||||
RUN apt install -y curl
|
|
||||||
|
|
||||||
COPY ${pkgFile} /root/
|
|
||||||
ENV TINI_VERSION v0.19.0
|
|
||||||
ENV TAOS_DISABLE_ADAPTER 1
|
|
||||||
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
|
||||||
WORKDIR /root/
|
|
||||||
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
|
|
||||||
|
|
||||||
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
|
|
||||||
LC_CTYPE=en_US.UTF-8 \
|
|
||||||
LANG=en_US.UTF-8 \
|
|
||||||
LC_ALL=en_US.UTF-8
|
|
||||||
COPY ./run.sh /usr/bin/
|
|
||||||
COPY ./bin/* /usr/bin/
|
|
||||||
|
|
||||||
ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
|
|
||||||
CMD ["bash", "-c", "/usr/bin/run.sh"]
|
|
||||||
VOLUME [ "/var/lib/taos", "/var/log/taos" ]
|
|
|
@ -49,6 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
||||||
|
if (NULL == vgInfo) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
vgInfo->vgVersion = rsp->vgVersion;
|
||||||
|
vgInfo->stateTs = rsp->stateTs;
|
||||||
|
vgInfo->hashMethod = rsp->hashMethod;
|
||||||
|
vgInfo->hashPrefix = rsp->hashPrefix;
|
||||||
|
vgInfo->hashSuffix = rsp->hashSuffix;
|
||||||
|
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
if (NULL == vgInfo->vgHash) {
|
||||||
|
taosMemoryFree(vgInfo);
|
||||||
|
tscError("hash init[%d] failed", rsp->vgNum);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < rsp->vgNum; ++j) {
|
||||||
|
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
|
||||||
|
if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
|
||||||
|
tscError("hash push failed, errno:%d", errno);
|
||||||
|
taosHashCleanup(vgInfo->vgHash);
|
||||||
|
taosMemoryFree(vgInfo);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
if (code) {
|
||||||
|
taosHashCleanup(vgInfo->vgHash);
|
||||||
|
taosMemoryFreeClear(vgInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
*pInfo = vgInfo;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -67,37 +109,22 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
||||||
if (rsp->vgVersion < 0) {
|
if (rsp->vgVersion < 0) {
|
||||||
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
|
||||||
} else {
|
} else {
|
||||||
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
SDBVgInfo *vgInfo = NULL;
|
||||||
if (NULL == vgInfo) {
|
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
vgInfo->vgVersion = rsp->vgVersion;
|
|
||||||
vgInfo->stateTs = rsp->stateTs;
|
|
||||||
vgInfo->hashMethod = rsp->hashMethod;
|
|
||||||
vgInfo->hashPrefix = rsp->hashPrefix;
|
|
||||||
vgInfo->hashSuffix = rsp->hashSuffix;
|
|
||||||
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
|
||||||
if (NULL == vgInfo->vgHash) {
|
|
||||||
taosMemoryFree(vgInfo);
|
|
||||||
tscError("hash init[%d] failed", rsp->vgNum);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < rsp->vgNum; ++j) {
|
|
||||||
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
|
|
||||||
if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
|
|
||||||
tscError("hash push failed, errno:%d", errno);
|
|
||||||
taosHashCleanup(vgInfo->vgHash);
|
|
||||||
taosMemoryFree(vgInfo);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
|
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
|
||||||
|
|
||||||
|
if (IS_SYS_DBNAME(rsp->db)) {
|
||||||
|
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
catalogUpdateDBVgInfo(pCatalog, (rsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->uid, vgInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -492,6 +519,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
||||||
|
|
||||||
for (int32_t i = 0; i < dbNum; ++i) {
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
SDbVgVersion *db = &dbs[i];
|
SDbVgVersion *db = &dbs[i];
|
||||||
|
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
|
||||||
|
i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
|
||||||
|
|
||||||
db->dbId = htobe64(db->dbId);
|
db->dbId = htobe64(db->dbId);
|
||||||
db->vgVersion = htonl(db->vgVersion);
|
db->vgVersion = htonl(db->vgVersion);
|
||||||
db->numOfTable = htonl(db->numOfTable);
|
db->numOfTable = htonl(db->numOfTable);
|
||||||
|
|
|
@ -163,6 +163,22 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
setErrno(pRequest, code);
|
setErrno(pRequest, code);
|
||||||
|
} else {
|
||||||
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
|
||||||
|
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
||||||
|
.requestId = pRequest->requestId,
|
||||||
|
.requestObjRefId = pRequest->self,
|
||||||
|
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
|
||||||
|
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
|
||||||
|
snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
|
||||||
|
catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRequest->body.queryFp) {
|
if (pRequest->body.queryFp) {
|
||||||
|
|
|
@ -98,6 +98,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
|
||||||
|
|
||||||
while (newSize < pAttr->length + dataLen) {
|
while (newSize < pAttr->length + dataLen) {
|
||||||
newSize = newSize * 1.5;
|
newSize = newSize * 1.5;
|
||||||
|
if (newSize > UINT32_MAX) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
|
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
|
||||||
|
|
|
@ -61,7 +61,7 @@ int32_t tsHeartbeatInterval = 1000;
|
||||||
int32_t tsHeartbeatTimeout = 20 * 1000;
|
int32_t tsHeartbeatTimeout = 20 * 1000;
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
int64_t tsVndCommitMaxIntervalMs = 60 * 1000;
|
int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
|
||||||
|
|
||||||
// monitor
|
// monitor
|
||||||
bool tsEnableMonitor = true;
|
bool tsEnableMonitor = true;
|
||||||
|
|
|
@ -672,7 +672,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
} else {
|
} else {
|
||||||
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
|
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
|
||||||
char *newTopic = taosArrayGetP(pTopicList, j);
|
char *newTopic = taosArrayGetP(pTopicList, j);
|
||||||
int comp = compareLenPrefixedStr(oldTopic, newTopic);
|
int comp = strcmp(oldTopic, newTopic);
|
||||||
if (comp == 0) {
|
if (comp == 0) {
|
||||||
i++;
|
i++;
|
||||||
j++;
|
j++;
|
||||||
|
|
|
@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
taosThreadMutexLock(&pSdb->filelock);
|
||||||
if (pSdb->pWal != NULL) {
|
if (pSdb->pWal != NULL) {
|
||||||
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
|
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0);
|
||||||
if (pSdb->sync == 0) {
|
if (pSdb->sync == 0) {
|
||||||
code = 0;
|
code = 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1067,6 +1067,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||||
if (code < 0) return code;
|
if (code < 0) return code;
|
||||||
#endif
|
#endif
|
||||||
|
if (tsDisableStream) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// 1.deserialize msg and build task
|
// 1.deserialize msg and build task
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
|
|
|
@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) {
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
||||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||||
ASSERT(pReader->pWalReader->curInvalid);
|
|
||||||
ASSERT(pReader->pWalReader->curVersion == ver);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(pReader->pWalReader->curVersion == ver);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
|
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
|
||||||
pReader->pMsg = pMsg;
|
pReader->pMsg = pMsg;
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
||||||
while (true) {
|
// while (true) {
|
||||||
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
|
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
|
||||||
if (pReader->pBlock == NULL) break;
|
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
|
||||||
}
|
// pReader->msgIter.len, pReader->msgIter.uid);
|
||||||
|
// if (pReader->pBlock == NULL) break;
|
||||||
|
// }
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
||||||
pReader->ver = ver;
|
pReader->ver = ver;
|
||||||
|
|
|
@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
|
||||||
int vnodeCloseBufPool(SVnode *pVnode) {
|
int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
SVBufPool *pPool;
|
SVBufPool *pPool;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pVnode->mutex);
|
||||||
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
|
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
|
||||||
pVnode->pPool = pPool->next;
|
pVnode->pPool = pPool->next;
|
||||||
vnodeBufPoolDestroy(pPool);
|
vnodeBufPoolDestroy(pPool);
|
||||||
|
@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) {
|
||||||
vnodeBufPoolDestroy(pVnode->inUse);
|
vnodeBufPoolDestroy(pVnode->inUse);
|
||||||
pVnode->inUse = NULL;
|
pVnode->inUse = NULL;
|
||||||
}
|
}
|
||||||
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
|
|
||||||
|
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
||||||
pVnode->pPool = pPool;
|
pVnode->pPool = pPool;
|
||||||
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
taosThreadCondSignal(&pVnode->poolNotEmpty);
|
||||||
|
|
||||||
|
if (pVnode->inUse == pPool) {
|
||||||
|
pVnode->inUse = NULL;
|
||||||
|
}
|
||||||
taosThreadMutexUnlock(&pVnode->mutex);
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeShouldCommit(SVnode *pVnode) {
|
int vnodeShouldCommit(SVnode *pVnode) {
|
||||||
if (!pVnode->inUse || !osDataSpaceAvailable()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
SVCommitSched *pSched = &pVnode->commitSched;
|
SVCommitSched *pSched = &pVnode->commitSched;
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
|
bool diskAvail = osDataSpaceAvailable();
|
||||||
|
bool needCommit = false;
|
||||||
|
|
||||||
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
|
taosThreadMutexLock(&pVnode->mutex);
|
||||||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
|
if (!pVnode->inUse || !diskAvail) {
|
||||||
}
|
goto _out;
|
||||||
|
|
||||||
int vnodeShouldCommitOld(SVnode *pVnode) {
|
|
||||||
if (pVnode->inUse) {
|
|
||||||
return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
|
|
||||||
}
|
}
|
||||||
return false;
|
needCommit =
|
||||||
|
(((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
|
||||||
|
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
|
||||||
|
_out:
|
||||||
|
taosThreadMutexUnlock(&pVnode->mutex);
|
||||||
|
return needCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
||||||
|
@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||||
metaPrepareAsyncCommit(pVnode->pMeta);
|
metaPrepareAsyncCommit(pVnode->pMeta);
|
||||||
|
|
||||||
vnodeBufPoolUnRef(pVnode->inUse);
|
vnodeBufPoolUnRef(pVnode->inUse);
|
||||||
pVnode->inUse = NULL;
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -869,13 +869,14 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa
|
||||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
colDataSetVal(pColInfo, i, p, false);
|
colDataSetVal(pColInfo, i, p, false);
|
||||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
char* tmp = alloca(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
||||||
varDataSetLen(tmp, tagVal.nData);
|
varDataSetLen(tmp, tagVal.nData);
|
||||||
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||||
colDataSetVal(pColInfo, i, tmp, false);
|
colDataSetVal(pColInfo, i, tmp, false);
|
||||||
#if TAG_FILTER_DEBUG
|
#if TAG_FILTER_DEBUG
|
||||||
qDebug("tagfilter varch:%s", tmp + 2);
|
qDebug("tagfilter varch:%s", tmp + 2);
|
||||||
#endif
|
#endif
|
||||||
|
taosMemoryFree(tmp);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
|
colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
|
||||||
#if TAG_FILTER_DEBUG
|
#if TAG_FILTER_DEBUG
|
||||||
|
|
|
@ -107,7 +107,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
|
||||||
ASSERT(pInfo->validBlockIndex == 0);
|
ASSERT(pInfo->validBlockIndex == 0);
|
||||||
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||||
|
|
||||||
|
@ -1047,18 +1047,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
#if 0
|
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
|
|
||||||
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
|
|
||||||
qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
|
|
||||||
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
|
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||||
int64_t uid = pOffset->uid;
|
int64_t uid = pOffset->uid;
|
||||||
|
|
|
@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
||||||
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
|
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
|
||||||
SWinKey key = {.groupId = groupId, .ts = pRow->key};
|
SWinKey key = {.groupId = groupId, .ts = pRow->key};
|
||||||
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
|
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
|
||||||
|
qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1064,11 +1064,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||||
pInfo->scalarSup.numOfExprs, NULL);
|
pInfo->scalarSup.numOfExprs, NULL);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
taosHashClear(pInfo->pPartitions);
|
taosHashClear(pInfo->pPartitions);
|
||||||
doStreamHashPartitionImpl(pInfo, pBlock);
|
doStreamHashPartitionImpl(pInfo, pBlock);
|
||||||
|
|
|
@ -24,6 +24,17 @@
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
typedef struct SJoinRowCtx {
|
||||||
|
bool rowRemains;
|
||||||
|
int64_t ts;
|
||||||
|
SArray* leftRowLocations;
|
||||||
|
SArray* rightRowLocations;
|
||||||
|
SArray* leftCreatedBlocks;
|
||||||
|
SArray* rightCreatedBlocks;
|
||||||
|
int32_t leftRowIdx;
|
||||||
|
int32_t rightRowIdx;
|
||||||
|
} SJoinRowCtx;
|
||||||
|
|
||||||
typedef struct SJoinOperatorInfo {
|
typedef struct SJoinOperatorInfo {
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
int32_t joinType;
|
int32_t joinType;
|
||||||
|
@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo {
|
||||||
int32_t rightPos;
|
int32_t rightPos;
|
||||||
SColumnInfo rightCol;
|
SColumnInfo rightCol;
|
||||||
SNode* pCondAfterMerge;
|
SNode* pCondAfterMerge;
|
||||||
|
|
||||||
|
SJoinRowCtx rowCtx;
|
||||||
} SJoinOperatorInfo;
|
} SJoinOperatorInfo;
|
||||||
|
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
|
@ -287,49 +300,107 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
|
||||||
|
|
||||||
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
|
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
|
||||||
int32_t* nRows) {
|
int32_t* nRows) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
|
||||||
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
|
||||||
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
|
||||||
|
|
||||||
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
|
||||||
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
|
SArray* leftRowLocations = NULL;
|
||||||
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
SArray* leftCreatedBlocks = NULL;
|
||||||
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
SArray* rightRowLocations = NULL;
|
||||||
|
SArray* rightCreatedBlocks = NULL;
|
||||||
|
int32_t leftRowIdx = 0;
|
||||||
|
int32_t rightRowIdx = 0;
|
||||||
|
int32_t i, j;
|
||||||
|
|
||||||
|
if (pJoinInfo->rowCtx.rowRemains) {
|
||||||
|
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
|
||||||
|
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
|
||||||
|
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
|
||||||
|
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
|
||||||
|
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
|
||||||
|
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
|
||||||
|
} else {
|
||||||
|
leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||||
|
leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||||
|
rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
|
||||||
|
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
|
||||||
|
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
||||||
|
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
|
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
|
||||||
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
|
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
|
||||||
code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
|
uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
|
||||||
|
uint32_t limitRowNum = maxRowNum;
|
||||||
|
if (maxRowNum > pOperator->resultInfo.threshold) {
|
||||||
|
limitRowNum = pOperator->resultInfo.threshold;
|
||||||
|
if (!pJoinInfo->rowCtx.rowRemains) {
|
||||||
|
pJoinInfo->rowCtx.rowRemains = true;
|
||||||
|
pJoinInfo->rowCtx.ts = timestamp;
|
||||||
|
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
|
||||||
|
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
|
||||||
|
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
|
||||||
|
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = blockDataEnsureCapacity(pRes, limitRowNum);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
|
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
|
||||||
leftNumJoin, rightNumJoin);
|
leftNumJoin, rightNumJoin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
for (int32_t i = 0; i < leftNumJoin; ++i) {
|
bool done = false;
|
||||||
for (int32_t j = 0; j < rightNumJoin; ++j) {
|
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
|
||||||
|
for (j = rightRowIdx; j < rightNumJoin; ++j) {
|
||||||
|
if (*nRows >= limitRowNum) {
|
||||||
|
done = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
||||||
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
|
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
|
||||||
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
|
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
|
||||||
rightRow->pos);
|
rightRow->pos);
|
||||||
++*nRows;
|
++*nRows;
|
||||||
}
|
}
|
||||||
|
if (done) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxRowNum > pOperator->resultInfo.threshold) {
|
||||||
|
pJoinInfo->rowCtx.leftRowIdx = i;
|
||||||
|
pJoinInfo->rowCtx.rightRowIdx = j;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
if (maxRowNum <= pOperator->resultInfo.threshold) {
|
||||||
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
||||||
blockDataDestroy(pBlock);
|
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(rightCreatedBlocks);
|
||||||
|
taosArrayDestroy(rightRowLocations);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
|
||||||
|
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(leftCreatedBlocks);
|
||||||
|
taosArrayDestroy(leftRowLocations);
|
||||||
|
|
||||||
|
if (pJoinInfo->rowCtx.rowRemains) {
|
||||||
|
pJoinInfo->rowCtx.rowRemains = false;
|
||||||
|
pJoinInfo->rowCtx.leftRowLocations = NULL;
|
||||||
|
pJoinInfo->rowCtx.rightRowLocations = NULL;
|
||||||
|
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
|
||||||
|
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroy(rightCreatedBlocks);
|
|
||||||
taosArrayDestroy(rightRowLocations);
|
|
||||||
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
|
|
||||||
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
|
|
||||||
blockDataDestroy(pBlock);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(leftCreatedBlocks);
|
|
||||||
taosArrayDestroy(leftRowLocations);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,9 +450,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
while (1) {
|
while (1) {
|
||||||
int64_t leftTs = 0;
|
int64_t leftTs = 0;
|
||||||
int64_t rightTs = 0;
|
int64_t rightTs = 0;
|
||||||
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
|
if (pJoinInfo->rowCtx.rowRemains) {
|
||||||
if (!hasNextTs) {
|
leftTs = pJoinInfo->rowCtx.ts;
|
||||||
break;
|
rightTs = pJoinInfo->rowCtx.ts;
|
||||||
|
} else {
|
||||||
|
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
|
||||||
|
if (!hasNextTs) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (leftTs == rightTs) {
|
if (leftTs == rightTs) {
|
||||||
|
@ -389,12 +465,12 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
|
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
|
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1538,16 +1538,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
|
||||||
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
|
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
|
||||||
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
|
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey,
|
||||||
tw.ekey, tmpKey.groupId, mark);
|
tw.ekey, tmpKey.groupId, mark);
|
||||||
} else {
|
} else {
|
||||||
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
||||||
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
|
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
|
||||||
key->groupId, mark);
|
key->groupId, mark);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
|
||||||
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
|
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
|
||||||
key->groupId, mark);
|
key->groupId, mark);
|
||||||
}
|
}
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
|
|
@ -5012,14 +5012,24 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType &&
|
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
|
||||||
pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType &&
|
if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType) {
|
||||||
tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
|
if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3120,9 +3120,8 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
|
||||||
p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
|
||||||
|
|
||||||
|
p[i] = colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
|
||||||
if (p[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3146,9 +3145,8 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
|
||||||
|
|
||||||
p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
p[i] = !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
|
||||||
if (p[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3178,13 +3176,13 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
SColumnInfoData *pData = info->cunits[0].colData;
|
SColumnInfoData *pData = info->cunits[0].colData;
|
||||||
|
|
||||||
void *colData = colDataGetData(pData, i);
|
if (colDataIsNull_s(pData, i)) {
|
||||||
if (colData == NULL || colDataIsNull_s(pData, i)) {
|
|
||||||
all = false;
|
all = false;
|
||||||
p[i] = 0;
|
p[i] = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *colData = colDataGetData(pData, i);
|
||||||
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
||||||
|
|
||||||
if (p[i] == 0) {
|
if (p[i] == 0) {
|
||||||
|
@ -3210,13 +3208,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
|
||||||
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
if (colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
||||||
p[i] = 0;
|
p[i] = 0;
|
||||||
all = false;
|
all = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||||
// match/nmatch for nchar type need convert from ucs4 to mbs
|
// match/nmatch for nchar type need convert from ucs4 to mbs
|
||||||
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
|
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
|
||||||
(info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) {
|
(info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) {
|
||||||
|
@ -3274,7 +3273,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SC
|
||||||
if (!isNull) {
|
if (!isNull) {
|
||||||
colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
|
colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (colData == NULL || isNull) {
|
if (colData == NULL || isNull) {
|
||||||
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||||
// access
|
// access
|
||||||
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
||||||
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
||||||
|
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
||||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
||||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
||||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
||||||
|
|
|
@ -270,88 +270,40 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||||
|
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
||||||
|
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
|
||||||
|
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int64_t logRetention = 0;
|
||||||
|
|
||||||
if (syncNodeIsMnode(pSyncNode)) {
|
if (syncNodeIsMnode(pSyncNode)) {
|
||||||
// mnode
|
// mnode
|
||||||
int64_t logRetention = SYNC_MNODE_LOG_RETENTION;
|
logRetention = SYNC_MNODE_LOG_RETENTION;
|
||||||
|
|
||||||
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
|
||||||
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
|
||||||
int64_t logNum = endIndex - beginIndex;
|
|
||||||
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
|
||||||
|
|
||||||
if (isEmpty || (!isEmpty && logNum < logRetention)) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
|
|
||||||
lastApplyIndex, logNum, isEmpty);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
|
||||||
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
|
||||||
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
|
||||||
|
|
||||||
if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
if (pSyncNode->replicaNum > 1) {
|
if (pSyncNode->replicaNum > 1) {
|
||||||
// multi replicas
|
// multi replicas
|
||||||
|
logRetention = SYNC_VNODE_LOG_RETENTION;
|
||||||
lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
|
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
|
||||||
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
|
||||||
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
|
||||||
if (lastApplyIndex > matchIndex) {
|
|
||||||
sNTrace(pSyncNode,
|
|
||||||
"new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
|
|
||||||
" of dnode:%d, do not delete wal",
|
|
||||||
lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
|
||||||
if (lastApplyIndex > pSyncNode->minMatchIndex) {
|
|
||||||
sNTrace(pSyncNode,
|
|
||||||
"new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
|
|
||||||
lastApplyIndex, pSyncNode->minMatchIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// one replica
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSyncNode->replicaNum > 1) {
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
|
||||||
|
lastApplyIndex);
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
|
||||||
|
}
|
||||||
|
|
||||||
_DEL_WAL:
|
_DEL_WAL:
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -366,7 +318,7 @@ _DEL_WAL:
|
||||||
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
|
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
|
||||||
pSyncNode->snapshottingTime = taosGetTimestampMs();
|
pSyncNode->snapshottingTime = taosGetTimestampMs();
|
||||||
|
|
||||||
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
|
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
|
||||||
pSyncNode->snapshottingIndex, lastApplyIndex);
|
pSyncNode->snapshottingIndex, lastApplyIndex);
|
||||||
|
@ -2142,24 +2094,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
if (timerLogicClock == msgLogicClock) {
|
if (timerLogicClock == msgLogicClock) {
|
||||||
if (tsNow > pData->execTime) {
|
if (tsNow > pData->execTime) {
|
||||||
#if 0
|
|
||||||
sTrace(
|
|
||||||
"vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
|
|
||||||
"---------",
|
|
||||||
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pData->execTime += pSyncTimer->timerMS;
|
pData->execTime += pSyncTimer->timerMS;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||||
|
|
||||||
|
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
|
|
||||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
pSyncMsg->destId = pData->destId;
|
pSyncMsg->destId = pData->destId;
|
||||||
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
||||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
pSyncMsg->timeStamp = tsNow;
|
pSyncMsg->timeStamp = tsNow;
|
||||||
|
|
||||||
|
@ -2171,11 +2118,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
#if 0
|
|
||||||
sTrace(
|
|
||||||
"vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
|
|
||||||
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
|
@ -2468,6 +2410,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
syncNodeStepDown(ths, pMsg->currentTerm);
|
syncNodeStepDown(ths, pMsg->currentTerm);
|
||||||
|
|
||||||
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
|
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
|
||||||
|
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
|
||||||
|
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
|
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
|
||||||
if (pMsg->currentTerm == matchTerm) {
|
if (pMsg->currentTerm == matchTerm) {
|
||||||
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
||||||
|
|
|
@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
|
||||||
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
|
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
|
||||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
if (pEntry == NULL) continue;
|
if (pEntry == NULL) continue;
|
||||||
|
@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
|
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
|
||||||
}
|
}
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
|
||||||
return term;
|
return term;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
bool empty = (pBuf->endIndex <= pBuf->startIndex);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return empty;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
syncLogBufferValidate(pBuf);
|
syncLogBufferValidate(pBuf);
|
||||||
|
@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
|
||||||
|
|
||||||
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
ASSERT(lastVer == pBuf->matchIndex);
|
ASSERT(lastVer == pBuf->matchIndex);
|
||||||
SyncIndex index = pBuf->endIndex - 1;
|
SyncIndex index = pBuf->endIndex - 1;
|
||||||
|
@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||||
syncLogReplMgrReset(pMgr);
|
syncLogReplMgrReset(pMgr);
|
||||||
}
|
}
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,7 +253,7 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
|
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
|
||||||
SBTC btc;
|
SBTC btc = {0};
|
||||||
int c;
|
int c;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -264,11 +264,18 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
|
||||||
// move the cursor
|
// move the cursor
|
||||||
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
|
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
ASSERT(0);
|
tdbError("tdb/btree-upsert: btc move to failed with ret: %d.", ret);
|
||||||
|
if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
|
||||||
|
tdbFree(btc.coder.pKey);
|
||||||
|
}
|
||||||
tdbBtcClose(&btc);
|
tdbBtcClose(&btc);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
|
||||||
|
tdbFree(btc.coder.pKey);
|
||||||
|
}
|
||||||
|
|
||||||
if (btc.idx == -1) {
|
if (btc.idx == -1) {
|
||||||
btc.idx = 0;
|
btc.idx = 0;
|
||||||
c = 1;
|
c = 1;
|
||||||
|
@ -280,8 +287,8 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
|
||||||
|
|
||||||
ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
|
ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
ASSERT(0);
|
|
||||||
tdbBtcClose(&btc);
|
tdbBtcClose(&btc);
|
||||||
|
tdbError("tdb/btree-upsert: btc upsert failed with ret: %d.", ret);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1428,15 +1435,19 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
|
||||||
// Clear the state of decoder
|
// Clear the state of decoder
|
||||||
if (TDB_CELLDECODER_FREE_VAL(pDecoder)) {
|
if (TDB_CELLDECODER_FREE_VAL(pDecoder)) {
|
||||||
tdbFree(pDecoder->pVal);
|
tdbFree(pDecoder->pVal);
|
||||||
|
TDB_CELLDECODER_CLZ_FREE_VAL(pDecoder);
|
||||||
|
// tdbTrace("tdb btc decoder val set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
|
||||||
|
}
|
||||||
|
if (TDB_CELLDECODER_FREE_KEY(pDecoder)) {
|
||||||
|
tdbFree(pDecoder->pKey);
|
||||||
|
TDB_CELLDECODER_CLZ_FREE_KEY(pDecoder);
|
||||||
|
// tdbTrace("tdb btc decoder key set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
|
||||||
}
|
}
|
||||||
pDecoder->kLen = -1;
|
pDecoder->kLen = -1;
|
||||||
pDecoder->pKey = NULL;
|
pDecoder->pKey = NULL;
|
||||||
pDecoder->vLen = -1;
|
pDecoder->vLen = -1;
|
||||||
pDecoder->pVal = NULL;
|
pDecoder->pVal = NULL;
|
||||||
pDecoder->pgno = 0;
|
pDecoder->pgno = 0;
|
||||||
TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
|
|
||||||
|
|
||||||
// tdbTrace("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
|
|
||||||
|
|
||||||
// 1. Decode header part
|
// 1. Decode header part
|
||||||
if (!leaf) {
|
if (!leaf) {
|
||||||
|
@ -2188,7 +2199,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
|
||||||
} else {
|
} else {
|
||||||
lidx = lidx + 1;
|
lidx = lidx + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// compare last cell
|
// compare last cell
|
||||||
if (lidx <= ridx) {
|
if (lidx <= ridx) {
|
||||||
pBtc->idx = ridx;
|
pBtc->idx = ridx;
|
||||||
|
|
|
@ -122,6 +122,8 @@ typedef struct SBtInfo {
|
||||||
#define TDB_CELLD_F_VAL 0x2
|
#define TDB_CELLD_F_VAL 0x2
|
||||||
|
|
||||||
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
|
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
|
||||||
|
#define TDB_CELLDECODER_CLZ_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_KEY)
|
||||||
|
#define TDB_CELLDECODER_CLZ_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_VAL)
|
||||||
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
|
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
|
||||||
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
|
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "theap.h"
|
#include "theap.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "transLog.h"
|
#include "transLog.h"
|
||||||
#include "transportInt.h"
|
#include "transportInt.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
|
@ -11,7 +11,6 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
typedef struct SConnList {
|
typedef struct SConnList {
|
||||||
|
@ -224,9 +223,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// snprintf may cause performance problem
|
// snprintf may cause performance problem
|
||||||
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
||||||
do { \
|
do { \
|
||||||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
char* p = key; \
|
||||||
|
int32_t len = strlen(ip); \
|
||||||
|
if (p != NULL) memcpy(p, ip, len); \
|
||||||
|
p[len] = ':'; \
|
||||||
|
titoa(port, 10, &p[len + 1]); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||||
|
@ -664,7 +667,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
||||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
|
||||||
transAllocBuffer(pBuf, buf);
|
transAllocBuffer(pBuf, buf);
|
||||||
}
|
}
|
||||||
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -677,7 +680,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
if (pBuf->invalid) {
|
if (pBuf->invalid) {
|
||||||
cliHandleExcept(conn);
|
cliHandleExcept(conn);
|
||||||
break;
|
break;
|
||||||
|
@ -1949,11 +1952,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
if (rpcDebugFlag & DEBUG_DEBUG) {
|
||||||
char tbuf[256] = {0};
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
char tbuf[256] = {0};
|
||||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
pCtx->retryStep, pCtx->retryNextInterval);
|
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
|
||||||
|
pCtx->retryStep, pCtx->retryNextInterval);
|
||||||
|
}
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
|
@ -1990,7 +1995,7 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
|
||||||
pResp->pCont = buf;
|
pResp->pCont = buf;
|
||||||
pResp->contLen = len;
|
pResp->contLen = len;
|
||||||
|
|
||||||
*dst = epset;
|
epsetAssign(dst, &epset);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
||||||
|
@ -2015,7 +2020,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
||||||
} else {
|
} else {
|
||||||
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
||||||
tDebug("epset not equal, retry new epset");
|
tDebug("epset not equal, retry new epset");
|
||||||
pCtx->epSet = epSet;
|
epsetAssign(&pCtx->epSet, &epSet);
|
||||||
noDelay = false;
|
noDelay = false;
|
||||||
} else {
|
} else {
|
||||||
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
|
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
|
||||||
|
@ -2040,7 +2045,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
||||||
} else {
|
} else {
|
||||||
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
||||||
tDebug("epset not equal, retry new epset");
|
tDebug("epset not equal, retry new epset");
|
||||||
pCtx->epSet = epSet;
|
epsetAssign(&pCtx->epSet, &epSet);
|
||||||
noDelay = false;
|
noDelay = false;
|
||||||
} else {
|
} else {
|
||||||
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
|
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
|
||||||
|
@ -2130,10 +2135,6 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
|
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
|
||||||
pCtx->retryNextInterval = pCtx->retryMaxInterval;
|
pCtx->retryNextInterval = pCtx->retryMaxInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
pCtx->retryNextInterval = 0;
|
pCtx->retryNextInterval = 0;
|
||||||
pCtx->epsetRetryCnt++;
|
pCtx->epsetRetryCnt++;
|
||||||
|
@ -2181,9 +2182,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
STraceId* trace = &pResp->info.traceId;
|
STraceId* trace = &pResp->info.traceId;
|
||||||
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
|
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
|
||||||
if (hasEpSet) {
|
if (hasEpSet) {
|
||||||
char tbuf[256] = {0};
|
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
char tbuf[256] = {0};
|
||||||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx->pSem != NULL) {
|
if (pCtx->pSem != NULL) {
|
||||||
|
@ -2310,8 +2313,9 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
|
|
||||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->epSet = *pEpSet;
|
epsetAssign(&pCtx->epSet, pEpSet);
|
||||||
pCtx->origEpSet = *pEpSet;
|
epsetAssign(&pCtx->origEpSet, pEpSet);
|
||||||
|
|
||||||
pCtx->ahandle = pReq->info.ahandle;
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
|
|
||||||
|
@ -2356,8 +2360,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
||||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->epSet = *pEpSet;
|
epsetAssign(&pCtx->epSet, pEpSet);
|
||||||
pCtx->origEpSet = *pEpSet;
|
epsetAssign(&pCtx->origEpSet, pEpSet);
|
||||||
pCtx->ahandle = pReq->info.ahandle;
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->pSem = sem;
|
pCtx->pSem = sem;
|
||||||
|
|
|
@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
if (walSkipFetchBodyNew(pReader) < 0) {
|
if (walSkipFetchBodyNew(pReader) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
fetchVer++;
|
fetchVer = pReader->curVersion;
|
||||||
ASSERT(fetchVer == pReader->curVersion);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pReader->curStopped = 1;
|
pReader->curStopped = 1;
|
||||||
|
@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN] = {0};
|
||||||
|
|
||||||
taosCloseFile(&pReader->pIdxFile);
|
taosCloseFile(&pReader->pIdxFile);
|
||||||
taosCloseFile(&pReader->pLogFile);
|
taosCloseFile(&pReader->pLogFile);
|
||||||
|
@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReadHead->version != ver) {
|
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
|
||||||
pRead->pHead->head.version, ver);
|
|
||||||
pRead->curInvalid = 1;
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
|
|
|
@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
|
||||||
}
|
}
|
||||||
pRef->refId = tGenIdPI64();
|
pRef->refId = tGenIdPI64();
|
||||||
pRef->refVer = -1;
|
pRef->refVer = -1;
|
||||||
pRef->refFile = -1;
|
// pRef->refFile = -1;
|
||||||
pRef->pWal = pWal;
|
pRef->pWal = pWal;
|
||||||
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
|
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
|
||||||
return pRef;
|
return pRef;
|
||||||
|
@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
// bsearch in fileSet
|
||||||
SWalFileInfo tmpInfo;
|
// SWalFileInfo tmpInfo;
|
||||||
tmpInfo.firstVer = ver;
|
// tmpInfo.firstVer = ver;
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
// ASSERT(pRet != NULL);
|
||||||
pRef->refFile = pRet->firstVer;
|
// pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||||
#if 1
|
#if 1
|
||||||
void walUnrefVer(SWalRef *pRef) {
|
void walUnrefVer(SWalRef *pRef) {
|
||||||
pRef->refId = -1;
|
pRef->refId = -1;
|
||||||
pRef->refFile = -1;
|
// pRef->refFile = -1;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t ver = walGetFirstVer(pWal);
|
int64_t ver = walGetFirstVer(pWal);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
// bsearch in fileSet
|
||||||
SWalFileInfo tmpInfo;
|
// SWalFileInfo tmpInfo;
|
||||||
tmpInfo.firstVer = ver;
|
// tmpInfo.firstVer = ver;
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
// ASSERT(pRet != NULL);
|
||||||
pRef->refFile = pRet->firstVer;
|
// pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
return pRef;
|
return pRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||||
tmpInfo.firstVer = ver;
|
tmpInfo.firstVer = ver;
|
||||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
ASSERT(pRet != NULL);
|
ASSERT(pRet != NULL);
|
||||||
pRef->refFile = pRet->firstVer;
|
// pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return pRef;
|
return pRef;
|
||||||
|
|
|
@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
ASSERT(logRetention >= 0);
|
||||||
pWal->vers.verInSnapshotting = ver;
|
pWal->vers.verInSnapshotting = ver;
|
||||||
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
|
pWal->vers.logRetention = logRetention;
|
||||||
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
|
||||||
|
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
|
||||||
|
", last ver %" PRId64,
|
||||||
|
pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
// check file rolling
|
// check file rolling
|
||||||
if (pWal->cfg.retentionPeriod == 0) {
|
if (walGetLastFileSize(pWal) != 0) {
|
||||||
if (walGetLastFileSize(pWal) != 0) {
|
if (walRollImpl(pWal) < 0) {
|
||||||
if (walRollImpl(pWal) < 0) {
|
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
|
||||||
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
|
goto _err;
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
int64_t ver = pWal->vers.verInSnapshotting;
|
int64_t ver = pWal->vers.verInSnapshotting;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
|
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
|
||||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
", last ver %" PRId64,
|
||||||
|
pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
|
|
||||||
if (ver == -1) {
|
if (ver == -1) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
|
||||||
|
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||||
|
|
|
@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(pWal->vers.lastVer, i);
|
ASSERT_EQ(pWal->vers.lastVer, i);
|
||||||
if (i == 5) {
|
if (i == 5) {
|
||||||
walBeginSnapshot(pWal, i);
|
walBeginSnapshot(pWal, i, 0);
|
||||||
walEndSnapshot(pWal);
|
walEndSnapshot(pWal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
walBeginSnapshot(pWal, i - 1);
|
walBeginSnapshot(pWal, i - 1, 0);
|
||||||
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
|
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
|
||||||
walEndSnapshot(pWal);
|
walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
|
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
|
||||||
|
@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walBeginSnapshot(pWal, i - 1);
|
code = walBeginSnapshot(pWal, i - 1, 0);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
code = walEndSnapshot(pWal);
|
code = walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
|
@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) {
|
||||||
|
|
||||||
int tsem_timewait(tsem_t *psem, int64_t milis) {
|
int tsem_timewait(tsem_t *psem, int64_t milis) {
|
||||||
if (psem == NULL || *psem == NULL) return -1;
|
if (psem == NULL || *psem == NULL) return -1;
|
||||||
dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
|
dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
|
||||||
|
dispatch_semaphore_wait(*psem, time);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,8 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySet(pArray, pos + 1, p2);
|
taosArraySet(pArray, pos + 1, p2);
|
||||||
pos += 1;
|
memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
|
||||||
|
pos += 1;
|
||||||
} else {
|
} else {
|
||||||
pos += 1;
|
pos += 1;
|
||||||
}
|
}
|
||||||
|
@ -171,13 +172,14 @@ void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp
|
||||||
// do nothing
|
// do nothing
|
||||||
} else {
|
} else {
|
||||||
if (pos + 1 != i) {
|
if (pos + 1 != i) {
|
||||||
void* p = taosArrayGet(pArray, pos + 1);
|
void* p = taosArrayGetP(pArray, pos + 1);
|
||||||
if (fp != NULL) {
|
if (fp != NULL) {
|
||||||
fp(p);
|
fp(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySet(pArray, pos + 1, p2);
|
taosArraySet(pArray, pos + 1, p2);
|
||||||
pos += 1;
|
memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
|
||||||
|
pos += 1;
|
||||||
} else {
|
} else {
|
||||||
pos += 1;
|
pos += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1232,7 +1232,7 @@ int32_t taosArrayCompareString(const void *a, const void *b) {
|
||||||
const char *x = *(const char **)a;
|
const char *x = *(const char **)a;
|
||||||
const char *y = *(const char **)b;
|
const char *y = *(const char **)b;
|
||||||
|
|
||||||
return compareLenPrefixedStr(x, y);
|
return strcmp(x, y);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t comparestrPatternMatch(const void *pLeft, const void *pRight) {
|
int32_t comparestrPatternMatch(const void *pLeft, const void *pRight) {
|
||||||
|
|
|
@ -514,7 +514,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column length")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column/tag length")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
|
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
|
||||||
|
|
|
@ -179,6 +179,7 @@
|
||||||
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
|
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
|
||||||
,,y,script,./test.sh -f tsim/query/groupby.sim
|
,,y,script,./test.sh -f tsim/query/groupby.sim
|
||||||
,,y,script,./test.sh -f tsim/query/forceFill.sim
|
,,y,script,./test.sh -f tsim/query/forceFill.sim
|
||||||
|
,,n,script,./test.sh -f tsim/query/join.sim
|
||||||
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
||||||
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
||||||
,,y,script,./test.sh -f tsim/mnode/basic1.sim
|
,,y,script,./test.sh -f tsim/mnode/basic1.sim
|
||||||
|
@ -646,6 +647,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py
|
||||||
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
|
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
|
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
|
||||||
|
|
|
@ -40,7 +40,7 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
|
||||||
# /root/TDengine/source/common/src/tdataformat.c:1876:7: runtime error: signed integer overflow: 8252423483843671206 + 2406154664059062870 cannot be represented in type 'long int'
|
# /root/TDengine/source/common/src/tdataformat.c:1876:7: runtime error: signed integer overflow: 8252423483843671206 + 2406154664059062870 cannot be represented in type 'long int'
|
||||||
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
|
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
|
||||||
|
|
||||||
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14"|wc -l`
|
runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l`
|
||||||
|
|
||||||
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
|
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
|
||||||
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
||||||
|
|
|
@ -67,4 +67,19 @@ sql insert into t1 values (1591060628000, 1);
|
||||||
sql alter table st1 drop tag t2;
|
sql alter table st1 drop tag t2;
|
||||||
sql create table t2 using st1 tags(2);
|
sql create table t2 using st1 tags(2);
|
||||||
|
|
||||||
|
print ======== drop tag in super table
|
||||||
|
sql create database if not exists aaa;
|
||||||
|
sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'aaa';
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql drop database if exists foo;
|
||||||
|
sql create database if not exists foo;
|
||||||
|
sql create table foo.t(ts timestamp,name varchar(20));
|
||||||
|
sql create table foo.xt(ts timestamp,name varchar(20));
|
||||||
|
sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'foo';
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
$dbPrefix = db
|
||||||
|
$tbPrefix1 = tba
|
||||||
|
$tbPrefix2 = tbb
|
||||||
|
$mtPrefix = stb
|
||||||
|
$tbNum = 10000
|
||||||
|
$rowNum = 2
|
||||||
|
|
||||||
|
print =============== step1
|
||||||
|
$i = 0
|
||||||
|
$db = $dbPrefix . $i
|
||||||
|
$mt1 = $mtPrefix . $i
|
||||||
|
$i = 1
|
||||||
|
$mt2 = $mtPrefix . $i
|
||||||
|
|
||||||
|
sql drop database $db -x step1
|
||||||
|
step1:
|
||||||
|
sql create database $db
|
||||||
|
sql use $db
|
||||||
|
sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
|
||||||
|
sql create table $mt2 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
|
||||||
|
|
||||||
|
print ====== start create child tables and insert data
|
||||||
|
$i = 0
|
||||||
|
while $i < $tbNum
|
||||||
|
$tb = $tbPrefix1 . $i
|
||||||
|
sql create table $tb using $mt1 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
while $x < $rowNum
|
||||||
|
$cc = $x * 60000
|
||||||
|
$ms = 1601481600000 + $cc
|
||||||
|
|
||||||
|
sql insert into $tb values ($ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
$i = $i + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
print =============== step2
|
||||||
|
$i = 0
|
||||||
|
while $i < $tbNum
|
||||||
|
$tb = $tbPrefix2 . $i
|
||||||
|
sql create table $tb using $mt2 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
while $x < $rowNum
|
||||||
|
$cc = $x * 60000
|
||||||
|
$ms = 1601481600000 + $cc
|
||||||
|
|
||||||
|
sql insert into $tb values ($ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
$i = $i + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
sql select * from tba0 t1, tbb0 t2 where t1.ts=t2.ts;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql select * from stb0 t1, stb1 t2 where t1.ts=t2.ts and t1.tag2=t2.tag2;
|
||||||
|
if $rows != 200000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,67 @@
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
|
||||||
|
|
||||||
|
INT_COL = "c1"
|
||||||
|
BINT_COL = "c2"
|
||||||
|
SINT_COL = "c3"
|
||||||
|
TINT_COL = "c4"
|
||||||
|
FLOAT_COL = "c5"
|
||||||
|
DOUBLE_COL = "c6"
|
||||||
|
BOOL_COL = "c7"
|
||||||
|
|
||||||
|
BINARY_COL = "c8"
|
||||||
|
NCHAR_COL = "c9"
|
||||||
|
TS_COL = "c10"
|
||||||
|
|
||||||
|
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
|
||||||
|
UN_NUM_COL = [BOOL_COL, BINARY_COL, NCHAR_COL, ]
|
||||||
|
TS_TYPE_COL = [TS_COL]
|
||||||
|
|
||||||
|
DBNAME = "db"
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
tdSql.execute(f'drop database if exists db')
|
||||||
|
tdSql.execute(f'create database if not exists db vgroups 1')
|
||||||
|
|
||||||
|
def __create_tb(self, dbname="db"):
|
||||||
|
create_stb_sql = f'''create table {dbname}.stb1(
|
||||||
|
ts timestamp, f1 int
|
||||||
|
) tags (tag1 binary(16300))
|
||||||
|
'''
|
||||||
|
tdSql.execute(create_stb_sql)
|
||||||
|
|
||||||
|
tag_value = 'a'
|
||||||
|
for i in range(1200):
|
||||||
|
tag_value = tag_value + 'a'
|
||||||
|
|
||||||
|
for i in range(8000):
|
||||||
|
tdSql.execute(f"create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( '{tag_value}' )")
|
||||||
|
|
||||||
|
def __query_data(self, rows, dbname="db"):
|
||||||
|
tdSql.execute(
|
||||||
|
f'''select count(*) from {dbname}.stb1 where tag1 like '%a'
|
||||||
|
'''
|
||||||
|
)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdLog.printNoPrefix("==========step1:create table")
|
||||||
|
self.__create_tb()
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("==========step2:query data")
|
||||||
|
self.__query_data(10)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -195,6 +195,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(1, 1, 1)
|
tdSql.checkData(1, 1, 1)
|
||||||
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
|
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
|
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
|
||||||
uid1 = tdSql.getData(0, 5)
|
uid1 = tdSql.getData(0, 5)
|
||||||
uid2 = tdSql.getData(1, 5)
|
uid2 = tdSql.getData(1, 5)
|
||||||
|
|
Loading…
Reference in New Issue