Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/ly_res

This commit is contained in:
54liuyao 2024-08-22 19:32:50 +08:00
commit 2f96aa8ebb
35 changed files with 293 additions and 1682 deletions

View File

@ -16,7 +16,7 @@ toc_max_heading_level: 4
### 配置方式
多级存储支持 3 级,每级最多可配置 16 个挂载点。
多级存储支持 3 级,每级最多可配置 128 个挂载点。
**Tips** 典型的配置方案有0 级配置多个挂载点,每个挂载点对应单块 SAS 硬盘1 级配置多个挂载点,每个挂载点对应单块或多块 SATA 硬盘2 级可配置 S3 存储或其他廉价网络存储。
@ -110,4 +110,4 @@ s3migrate database <db_name>;
| :--- | :----------- | :----- | :----- | :------ | :----------------------------------------------------------- |
| 1 | s3_keeplocal | 3650 | 1 | 365000 | 数据在本地保留的天数,即 data 文件在本地磁盘保留多长时间后可以上传到 S3。默认单位支持 m分钟、h小时和 d三个单位 |
| 2 | s3_chunksize | 262144 | 131072 | 1048576 | 上传对象的大小阈值,与 TSDB_PAGESIZE 参数一样,不可修改,单位为 TSDB 页 |
| 3 | s3_compact | 0 | 0 | 1 | TSDB 文件组首次上传 S3 时,是否自动进行 compact 操作。 |
| 3 | s3_compact | 0 | 0 | 1 | TSDB 文件组首次上传 S3 时,是否自动进行 compact 操作。 |

View File

@ -53,7 +53,7 @@ taosd 命令行参数如下
| :--------------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |
| queryPolicy | 查询策略1: 只使用 vnode不使用 qnode; 2: 没有扫描算子的子任务在 qnode 执行,带扫描算子的子任务在 vnode 执行; 3: vnode 只运行扫描算子,其余算子均在 qnode 执行 缺省值1 |
| maxNumOfDistinctRes | 允许返回的 distinct 结果最大行数,默认值 10 万,最大允许值 1 亿 |
| countAlwaysReturnValue | ount/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值0: 返回空行1: 返回;该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. |
| countAlwaysReturnValue | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值0: 返回空行1: 返回;该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. |
### 区域相关

View File

@ -1,129 +0,0 @@
/*
* Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_EXCEPTION_H_
#define _TD_UTIL_EXCEPTION_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* cleanup actions
*/
typedef struct SCleanupAction {
bool failOnly;
uint8_t wrapper;
uint16_t reserved;
void* func;
union {
void* Ptr;
bool Bool;
char Char;
int8_t Int8;
uint8_t Uint8;
int16_t Int16;
uint16_t Uint16;
int32_t Int;
uint32_t Uint;
int32_t Int32;
uint32_t Uint32;
int64_t Int64;
uint64_t Uint64;
float Float;
double Double;
} arg1, arg2;
} SCleanupAction;
/*
* exception hander registration
*/
typedef struct SExceptionNode {
struct SExceptionNode* prev;
jmp_buf jb;
int32_t code;
int32_t maxCleanupAction;
int32_t numCleanupAction;
SCleanupAction* cleanupActions;
} SExceptionNode;
// functions & macros for auto-cleanup
void cleanupPush_void_ptr_ptr(bool failOnly, void* func, void* arg1, void* arg2);
void cleanupPush_void_ptr_bool(bool failOnly, void* func, void* arg1, bool arg2);
void cleanupPush_void_ptr(bool failOnly, void* func, void* arg);
void cleanupPush_int_int(bool failOnly, void* func, int32_t arg);
void cleanupPush_void(bool failOnly, void* func);
void cleanupPush_int_ptr(bool failOnly, void* func, void* arg);
int32_t cleanupGetActionCount();
void cleanupExecuteTo(int32_t anchor, bool failed);
void cleanupExecute(SExceptionNode* node, bool failed);
bool cleanupExceedLimit();
#define CLEANUP_PUSH_VOID_PTR_PTR(failOnly, func, arg1, arg2) \
cleanupPush_void_ptr_ptr((failOnly), (void*)(func), (void*)(arg1), (void*)(arg2))
#define CLEANUP_PUSH_VOID_PTR_BOOL(failOnly, func, arg1, arg2) \
cleanupPush_void_ptr_bool((failOnly), (void*)(func), (void*)(arg1), (bool)(arg2))
#define CLEANUP_PUSH_VOID_PTR(failOnly, func, arg) cleanupPush_void_ptr((failOnly), (void*)(func), (void*)(arg))
#define CLEANUP_PUSH_INT_INT(failOnly, func, arg) cleanupPush_void_ptr((failOnly), (void*)(func), (int32_t)(arg))
#define CLEANUP_PUSH_VOID(failOnly, func) cleanupPush_void((failOnly), (void*)(func))
#define CLEANUP_PUSH_INT_PTR(failOnly, func, arg) cleanupPush_int_ptr((failOnly), (void*)(func), (void*)(arg))
#define CLEANUP_PUSH_FREE(failOnly, arg) cleanupPush_void_ptr((failOnly), free, (void*)(arg))
#define CLEANUP_PUSH_CLOSE(failOnly, arg) cleanupPush_int_int((failOnly), close, (int32_t)(arg))
#define CLEANUP_PUSH_FCLOSE(failOnly, arg) cleanupPush_int_ptr((failOnly), fclose, (void*)(arg))
#define CLEANUP_GET_ANCHOR() cleanupGetActionCount()
#define CLEANUP_EXECUTE_TO(anchor, failed) cleanupExecuteTo((anchor), (failed))
#define CLEANUP_EXCEED_LIMIT() cleanupExceedLimit()
// functions & macros for exception handling
void exceptionPushNode(SExceptionNode* node);
int32_t exceptionPopNode();
void exceptionThrow(int32_t code);
#define TRY(maxCleanupActions) \
do { \
SExceptionNode exceptionNode = {0}; \
SCleanupAction cleanupActions[(maxCleanupActions) > 0 ? (maxCleanupActions) : 1]; \
exceptionNode.maxCleanupAction = (maxCleanupActions) > 0 ? (maxCleanupActions) : 1; \
exceptionNode.cleanupActions = cleanupActions; \
exceptionPushNode(&exceptionNode); \
int32_t caughtException = setjmp(exceptionNode.jb); \
if (caughtException == 0)
#define CATCH(code) \
int32_t code = exceptionPopNode(); \
if (caughtException == 1)
#define FINALLY(code) int32_t code = exceptionPopNode();
#define END_TRY \
} \
while (0) \
;
#define THROW(x) exceptionThrow((x))
#define CAUGHT_EXCEPTION() ((bool)(caughtException == 1))
#define CLEANUP_EXECUTE() cleanupExecute(&exceptionNode, CAUGHT_EXCEPTION())
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_EXCEPTION_H_*/

View File

@ -229,6 +229,9 @@ function install_bin() {
if [ -d ${script_dir}/${xname}/bin ]; then
${csudo}cp -r ${script_dir}/${xname}/bin/* ${install_main_dir}/bin
fi
if [ -e ${script_dir}/${xname}/uninstall_${xname}.sh ]; then
${csudo}cp -r ${script_dir}/${xname}/uninstall_${xname}.sh ${install_main_dir}/uninstall_${xname}.sh
fi
fi
if [ -f ${script_dir}/bin/quick_deploy.sh ]; then
@ -250,6 +253,8 @@ function install_bin() {
for service in "${services[@]}"; do
[ -x ${install_main_dir}/bin/${service} ] && ${csudo}ln -sf ${install_main_dir}/bin/${service} ${bin_link_dir}/${service} || :
done
[ -x ${install_main_dir}/uninstall_${xname}.sh ] && ${csudo}ln -sf ${install_main_dir}/uninstall_${xname}.sh ${bin_link_dir}/uninstall_${xname}.sh || :
}
function install_lib() {

View File

@ -363,8 +363,8 @@ if [ "$verMode" == "cluster" ]; then
# copy taosx
if [ -d ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ]; then
cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir}
cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx
sed -i 's/target=\"\"/target=\"taosx\"/g' ${install_dir}/taosx/uninstall.sh
cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx/uninstall_taosx.sh
sed -i "s/uninstall.sh/uninstall_taosx.sh/g" ${install_dir}/taosx/uninstall_taosx.sh
fi
fi
fi

View File

@ -56,7 +56,11 @@ local_bin_link_dir="/usr/local/bin"
service_config_dir="/etc/systemd/system"
config_dir="/etc/${PREFIX}"
services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"x" ${PREFIX}"-explorer" ${PREFIX}"keeper")
if [ "${verMode}" == "cluster" ]; then
services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"keeper")
else
services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"keeper" ${PREFIX}"-explorer")
fi
tools=(${PREFIX} ${PREFIX}"Benchmark" ${PREFIX}"dump" ${PREFIX}"demo" udfd set_core.sh TDinsight.sh $uninstallScript start-all.sh stop-all.sh)
csudo=""
@ -222,6 +226,30 @@ function remove_data_and_config() {
[ -d "${log_dir}" ] && ${csudo}rm -rf ${log_dir}
}
echo
echo "Do you want to remove all the data, log and configuration files? [y/n]"
read answer
remove_flag=false
if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then
confirmMsg="I confirm that I would like to delete all data, log and configuration files"
echo "Please enter '${confirmMsg}' to continue"
read answer
if [ X"$answer" == X"${confirmMsg}" ]; then
remove_flag=true
else
echo "answer doesn't match, skip this step"
fi
fi
echo
if [ -e ${install_main_dir}/uninstall_${PREFIX}x.sh ]; then
if [ X$remove_flag == X"true" ]; then
bash ${install_main_dir}/uninstall_${PREFIX}x.sh --clean-all true
else
bash ${install_main_dir}/uninstall_${PREFIX}x.sh --clean-all false
fi
fi
remove_bin
clean_header
# Remove lib file
@ -232,6 +260,11 @@ clean_log
clean_config
# Remove data link directory
${csudo}rm -rf ${data_link_dir} || :
if [ X$remove_flag == X"true" ]; then
remove_data_and_config
fi
${csudo}rm -rf ${install_main_dir} || :
if [[ -e /etc/os-release ]]; then
osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
@ -254,20 +287,6 @@ if [ "$osType" = "Darwin" ]; then
${csudo}rm -rf /Applications/TDengine.app
fi
echo
echo "Do you want to remove all the data, log and configuration files? [y/n]"
read answer
if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then
confirmMsg="I confirm that I would like to delete all data, log and configuration files"
echo "Please enter '${confirmMsg}' to continue"
read answer
if [ X"$answer" == X"${confirmMsg}" ]; then
remove_data_and_config
else
echo "answer doesn't match, skip this step"
fi
fi
command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true
echo
echo "${productName} is removed successfully!"

View File

@ -31,6 +31,8 @@ bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib"
lib64_link_dir="/usr/lib64"
inc_link_dir="/usr/include"
log_dir="/var/log/${clientName2}"
cfg_dir="/etc/${clientName2}"
csudo=""
if command -v sudo > /dev/null; then
@ -92,6 +94,24 @@ function clean_log() {
${csudo}rm -rf ${log_link_dir} || :
}
function clean_config_and_log_dir() {
# Remove link
echo "Do you want to remove all the log and configuration files? [y/n]"
read answer
if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then
confirmMsg="I confirm that I would like to delete all log and configuration files"
echo "Please enter '${confirmMsg}' to continue"
read answer
if [ X"$answer" == X"${confirmMsg}" ]; then
# Remove dir
rm -rf ${cfg_dir} || :
rm -rf ${log_dir} || :
else
echo "answer doesn't match, skip this step"
fi
fi
}
# Stop client.
kill_client
# Remove binary file and links
@ -104,6 +124,8 @@ clean_lib
clean_log
# Remove link configuration file
clean_config
# Remove dir
clean_config_and_log_dir
${csudo}rm -rf ${install_main_dir}

View File

@ -1774,7 +1774,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
int32_t nCols) {
// if (pColCmpr == NULL || colName == NULL) return -1;
ASSERT(taosArrayGetSize(pField) == nCols);
if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
TAOS_FIELD *p = taosArrayGet(pField, 0);
int32_t code = 0;

View File

@ -512,7 +512,6 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid,
buf[0] = (uint64_t)pHashMap;
buf[1] = suid;
setMD5DigestInKey(buf, key, keyLen);
ASSERT(keyLen == sizeof(uint64_t) * 2);
}
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,

View File

@ -275,10 +275,10 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) {
int32_t metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first, int8_t move) {
int32_t code = 0;
int32_t lino;
int8_t locked = 0;
int8_t locked = 0;
if (pTbCur->paused) {
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK);
locked = 1;
locked = 1;
code = tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
if (code != 0) {
TSDB_CHECK_CODE(code, lino, _exit);
@ -673,7 +673,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
}
}
if (ASSERTS(sver > 0, "failed to get table schema version: %d", sver)) {
if (!(sver > 0)) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
@ -1608,8 +1608,6 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
metaULock(pVnodeObj->pMeta);
if (numOfTables) *numOfTables = state.ctbNum;
if (numOfCols) *numOfCols = state.colNum;
ASSERTS(state.colNum > 0, "vgId:%d, suid:%" PRIi64 " nCols:%d <= 0 in metaCache", TD_VID(pVnodeObj), uid,
state.colNum);
goto _exit;
}

View File

@ -57,14 +57,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
if (pMsg->term > raftStoreGetTerm(ths)) {
if (pMsg->term != raftStoreGetTerm(ths)) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return TSDB_CODE_SYN_WRONG_TERM;
}
ASSERT(pMsg->term == raftStoreGetTerm(ths));
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);

View File

@ -57,7 +57,10 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->qu
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
int count = 0;
SSyncIndexMgr* pMatches = pNode->pMatchIndex;
ASSERT(pNode->replicaNum == pMatches->replicaNum);
if (pNode->replicaNum != pMatches->replicaNum) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return false;
};
for (int i = 0; i < pNode->totalReplicaNum; i++) {
if(pNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_VOTER){

View File

@ -119,7 +119,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
}
ret = syncNodeRequestVotePeers(pSyncNode);
ASSERT(ret == 0);
if (ret != 0) return ret;
syncNodeResetElectTimer(pSyncNode);
return ret;

View File

@ -988,9 +988,18 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
int32_t code = 0;
ASSERTS(pNode->pLogStore != NULL, "log store not created");
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
if (pNode->pLogStore == NULL) {
sError("vgId:%d, log store not created", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm == NULL) {
sError("vgId:%d, pFsm not registered", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
SSnapshot snapshot = {0};
// TODO check return value
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
@ -1384,8 +1393,14 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t code = 0;
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
if (pSyncNode->pLogStore == NULL) {
sError("vgId:%d, log store not created", pSyncNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pSyncNode->pLogBuf == NULL) {
sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
(void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1400,7 +1415,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
TAOS_RETURN(code);
}
ASSERT(endIndex == lastVer + 1);
if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
@ -1743,7 +1758,10 @@ inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
}
}
ASSERT(b1 == b2);
if (b1 != b2) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return false;
}
return b1;
}
@ -2194,7 +2212,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
}
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
// ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
}
@ -2222,7 +2240,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
}
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
sNTrace(pSyncNode, "assigned leader to leader");

View File

@ -71,16 +71,32 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _err;
}
ASSERT(index == pBuf->endIndex);
if (index != pBuf->endIndex) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
};
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
ASSERT(pExist == NULL);
if (pExist != NULL) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
// initial log buffer with at least one item, e.g. commitIndex
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
ASSERTS(pMatch != NULL, "no matched log entry");
ASSERT(pMatch->index + 1 == index);
ASSERT(pMatch->term <= pEntry->term);
if (pMatch == NULL) {
sError("vgId:%d, no matched log entry", pNode->vgId);
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
if (pMatch->index + 1 != index) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
if (!(pMatch->term <= pEntry->term)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
pBuf->entries[index % pBuf->size] = tmp;
@ -114,7 +130,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
}
ASSERT(index - 1 == prevIndex);
if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (prevIndex >= pBuf->startIndex) {
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
@ -178,9 +194,18 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
}
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERTS(pNode->pLogStore != NULL, "log store not created");
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
if (pNode->pLogStore == NULL) {
sError("log store not created");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm == NULL) {
sError("pFsm not registered");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
sError("FpGetSnapshotInfo not registered");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
int32_t code = 0, lino = 0;
SSnapshot snapshot = {0};
@ -191,7 +216,8 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer >= commitIndex);
if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
;
SyncIndex toIndex = lastVer;
// update match index
pBuf->commitIndex = commitIndex;
@ -239,7 +265,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// put a dummy record at commitIndex if present in log buffer
if (takeDummy) {
ASSERT(index == pBuf->commitIndex);
if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
if (pDummy == NULL) {
@ -392,9 +418,18 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
}
// update
ASSERT(pBuf->startIndex < index);
ASSERT(index - pBuf->startIndex < pBuf->size);
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
if (!(pBuf->startIndex < index)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
};
if (!(index - pBuf->startIndex < pBuf->size)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pBuf->entries[index % pBuf->size].pItem != NULL) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL;
pBuf->entries[index % pBuf->size] = tmp;
@ -422,14 +457,14 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
int32_t code = 0;
ASSERT(pEntry->index >= 0);
if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index);
TAOS_RETURN(code);
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer + 1);
if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
@ -439,7 +474,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer);
if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
return 0;
}
@ -718,7 +753,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
syncEntryDestroy(pEntry);
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
pBuf->startIndex = index + 1;
@ -786,7 +821,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
int64_t pos = index % pMgr->size;
ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
break;
@ -809,7 +847,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
tstrerror(code), index, pDestId->addr);
goto _out;
}
ASSERT(barrier == pMgr->states[pos].barrier);
if (barrier != pMgr->states[pos].barrier) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].term = term;
pMgr->states[pos].acked = false;
@ -839,11 +880,11 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
int32_t code = 0;
ASSERT(pMgr->restored == false);
if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0);
if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
@ -909,7 +950,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
TAOS_RETURN(code);
@ -918,13 +959,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
return 0;
}
ASSERT(index + 1 >= firstVer);
if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (term == pMsg->lastMatchTerm) {
index = index + 1;
ASSERT(index <= pNode->pLogBuf->matchIndex);
if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
ASSERT(index > firstVer);
if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
}
}
@ -975,8 +1016,8 @@ int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored);
ASSERT(pMgr->startIndex >= 0);
if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
int64_t nowMs = taosGetMonoTimestampMs();
int32_t code = 0;
@ -996,7 +1037,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
TAOS_RETURN(code);
}
ASSERT(index >= 0);
if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].term = term;
@ -1014,7 +1055,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
}
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored);
if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
@ -1070,7 +1111,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
@ -1100,7 +1141,10 @@ SSyncLogReplMgr* syncLogReplCreate() {
pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
return pMgr;
}
@ -1115,7 +1159,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
ASSERT(pNode->logReplMgrs[i] == NULL);
if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
pNode->logReplMgrs[i] = syncLogReplCreate();
if (pNode->logReplMgrs[i] == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
@ -1141,7 +1185,10 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _exit;
}
if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
@ -1194,7 +1241,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
int32_t code = 0;
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (toIndex == pBuf->endIndex) {
return 0;
@ -1217,7 +1264,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
}
pBuf->endIndex = toIndex;
pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
ASSERT(index + 1 == toIndex);
if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
// trunc wal
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
@ -1227,7 +1274,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
TAOS_RETURN(code);
}
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(toIndex == lastVer + 1);
if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
// refill buffer on need
if (toIndex <= pBuf->startIndex) {
@ -1237,7 +1284,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
}
}
ASSERT(pBuf->endIndex == toIndex);
if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)syncLogBufferValidate(pBuf);
return 0;
}
@ -1246,7 +1293,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
(void)taosThreadMutexLock(&pBuf->mutex);
(void)syncLogBufferValidate(pBuf);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer == pBuf->matchIndex);
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
SyncIndex index = pBuf->endIndex - 1;
(void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);

View File

@ -70,7 +70,10 @@ SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
return NULL;
}
memcpy(pEntry, pMsg->data, pMsg->dataLen);
ASSERT(pEntry->bytes == pMsg->dataLen);
if (pEntry->bytes != pMsg->dataLen) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
return pEntry;
}

View File

@ -61,7 +61,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
if (pData->pWal == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
(void)taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
@ -115,7 +118,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
// log[m .. n]
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0);
if (!(snapshotIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -303,14 +306,14 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(*ppEntry != NULL);
if (*ppEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
(*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
(*ppEntry)->index = index;
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
if ((*ppEntry)->dataLen != pWalHandle->pHead->head.bodyLen) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
/*
@ -362,14 +365,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
ASSERT(ppLastEntry != NULL);
if (ppLastEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
*ppLastEntry = NULL;
if (walIsEmpty(pWal)) {
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
} else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
if (!(lastIndex >= SYNC_INDEX_BEGIN)) return TSDB_CODE_SYN_INTERNAL_ERROR;
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
TAOS_RETURN(code);

View File

@ -104,7 +104,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->term);
}
SyncTerm currentTerm = raftStoreGetTerm(ths);
ASSERT(pMsg->term <= currentTerm);
if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR;
bool grant = (pMsg->term == currentTerm) && logOK &&
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
@ -130,7 +130,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pReply->destId = pMsg->srcId;
pReply->term = currentTerm;
pReply->voteGranted = grant;
ASSERT(!grant || pMsg->term == pReply->term);
if (!(!grant || pMsg->term == pReply->term)) return TSDB_CODE_SYN_INTERNAL_ERROR;
// trace log
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");

View File

@ -64,7 +64,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == currentTerm);
if (pMsg->term != currentTerm) return TSDB_CODE_SYN_INTERNAL_ERROR;
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.

View File

@ -57,7 +57,7 @@ static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE);
if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)taosThreadMutexInit(&pBuf->mutex, NULL);
*ppBuf = pBuf;
TAOS_RETURN(0);
@ -311,7 +311,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
}
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _OUT;
}
// send msg
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
@ -323,7 +326,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// put in buffer
int64_t nowMs = taosGetTimestampMs();
if (pBlk) {
ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END);
if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _OUT;
}
pBlk->sendTimeMs = nowMs;
pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
pBlk = NULL;
@ -351,7 +357,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
ASSERT(pBlk);
if (!pBlk) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
int64_t nowMs = taosGetTimestampMs();
if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
@ -682,7 +691,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT);
if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
int32_t code = 0, lino = 0;
// copy snap info from leader
@ -878,7 +887,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
goto _out;
}
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMsg->seq > pRcvBuf->cursor) {
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
@ -922,7 +931,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
// condition 4
// transfering
SyncSnapshotSend *pMsg = ppMsg[0];
ASSERT(pMsg);
if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
@ -1071,7 +1080,7 @@ _out:;
}
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncTLV *datHead = (void *)pMsg->data;
if (datHead->typ != pMsg->payloadType) {
@ -1168,11 +1177,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
goto _out;
}
ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end);
if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
ASSERT(pBlk);
if (!pBlk) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
pBlk->acked = 1;
}

View File

@ -357,7 +357,10 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
ASSERT(pFileInfo != NULL);
if (!pFileInfo) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);

View File

@ -371,8 +371,11 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
int32_t code = 0;
if (logRetention < 0) {
TAOS_RETURN(TSDB_CODE_FAILED);
}
TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex));
ASSERT(logRetention >= 0);
pWal->vers.verInSnapshotting = ver;
pWal->vers.logRetention = logRetention;
@ -438,7 +441,10 @@ int32_t walEndSnapshot(SWal *pWal) {
if (pInfo) {
wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver,
pInfo->firstVer, pInfo->lastVer);
ASSERT(ver <= pInfo->lastVer);
if (ver > pInfo->lastVer) {
TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
}
if (ver == pInfo->lastVer) {
pInfo++;
}

View File

@ -266,8 +266,9 @@ void* taosArrayInsert(SArray* pArray, size_t index, const void* pData) {
}
void taosArraySet(SArray* pArray, size_t index, void* pData) {
ASSERT(index < pArray->size);
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
if (index < pArray->size) {
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
}
}
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
@ -291,7 +292,9 @@ void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
}
void taosArrayRemove(SArray* pArray, size_t index) {
ASSERT(index < pArray->size);
if (!(index < pArray->size)) {
return;
}
if (index == pArray->size - 1) {
(void)taosArrayPop(pArray);
@ -305,17 +308,17 @@ void taosArrayRemove(SArray* pArray, size_t index) {
}
void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp) {
ASSERT(index + num <= pArray->size);
if (fp) {
for (int32_t i = 0; i < num; i++) {
fp(taosArrayGet(pArray, index + i));
if (index + num <= pArray->size) {
if (fp) {
for (int32_t i = 0; i < num; i++) {
fp(taosArrayGet(pArray, index + i));
}
}
}
memmove((char*)pArray->pData + index * pArray->elemSize, (char*)pArray->pData + (index + num) * pArray->elemSize,
(pArray->size - index - num) * pArray->elemSize);
pArray->size -= num;
memmove((char*)pArray->pData + index * pArray->elemSize, (char*)pArray->pData + (index + num) * pArray->elemSize,
(pArray->size - index - num) * pArray->elemSize);
pArray->size -= num;
}
}
SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
@ -349,8 +352,6 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
if (fn == NULL) {
memcpy(dst->pData, pSrc->pData, pSrc->elemSize * pSrc->size);
} else {
ASSERT(pSrc->elemSize == sizeof(void*));
for (int32_t i = 0; i < pSrc->size; ++i) {
void* p = fn(taosArrayGetP(pSrc, i));
memcpy(((char*)dst->pData) + i * dst->elemSize, &p, dst->elemSize);

View File

@ -78,7 +78,9 @@ _error:
}
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
ASSERT(!tBloomFilterIsFull(pBF));
if (tBloomFilterIsFull(pBF)) {
return TSDB_CODE_FAILED;
}
bool hasChange = false;
const register uint64_t size = pBF->numBits;
uint64_t cbHash = hash1;

File diff suppressed because it is too large Load Diff

View File

@ -121,7 +121,9 @@ int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); }
static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) {
cfgItemFreeVal(pItem);
ASSERT(pItem->str == NULL);
if (!(pItem->str == NULL)) {
return TSDB_CODE_INVALID_PARA;
}
pItem->str = taosStrdup(conf);
if (pItem->str == NULL) {

View File

@ -143,24 +143,20 @@ int32_t tdigestCompress(TDigest *t) {
if (a->mean <= b->mean) {
mergeCentroid(&args, a);
ASSERTS(args.idx < t->size, "idx over size");
i++;
} else {
mergeCentroid(&args, b);
ASSERTS(args.idx < t->size, "idx over size");
j++;
}
}
while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]);
ASSERTS(args.idx < t->size, "idx over size");
}
taosMemoryFree((void *)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
ASSERTS(args.idx < t->size, "idx over size");
}
if (t->total_weight > 0) {

View File

@ -104,7 +104,6 @@ void tEndEncode(SEncoder* pCoder) {
if (pCoder->data) {
pNode = pCoder->eStack;
ASSERT(pNode);
pCoder->eStack = pNode->pNext;
len = pCoder->pos;
@ -148,7 +147,6 @@ void tEndDecode(SDecoder* pCoder) {
SDecoderNode* pNode;
pNode = pCoder->dStack;
ASSERT(pNode);
pCoder->dStack = pNode->pNext;
pCoder->data = pNode->data;

View File

@ -1,150 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "texception.h"
#include "tlog.h"
static threadlocal SExceptionNode* expList;
void exceptionPushNode(SExceptionNode* node) {
node->prev = expList;
expList = node;
}
int32_t exceptionPopNode() {
SExceptionNode* node = expList;
expList = node->prev;
return node->code;
}
void exceptionThrow(int32_t code) {
expList->code = code;
longjmp(expList->jb, 1);
}
static void cleanupWrapper_void_ptr_ptr(SCleanupAction* ca) {
void (*func)(void*, void*) = ca->func;
func(ca->arg1.Ptr, ca->arg2.Ptr);
}
static void cleanupWrapper_void_ptr_bool(SCleanupAction* ca) {
void (*func)(void*, bool) = ca->func;
func(ca->arg1.Ptr, ca->arg2.Bool);
}
static void cleanupWrapper_void_ptr(SCleanupAction* ca) {
void (*func)(void*) = ca->func;
func(ca->arg1.Ptr);
}
static void cleanupWrapper_int_int(SCleanupAction* ca) {
int32_t (*func)(int32_t) = ca->func;
(void)func(ca->arg1.Int);
}
static void cleanupWrapper_void(SCleanupAction* ca) {
void (*func)() = ca->func;
func();
}
static void cleanupWrapper_int_ptr(SCleanupAction* ca) {
int32_t (*func)(void*) = ca->func;
(void)func(ca->arg1.Ptr);
}
typedef void (*wrapper)(SCleanupAction*);
static wrapper wrappers[] = {
cleanupWrapper_void_ptr_ptr, cleanupWrapper_void_ptr_bool, cleanupWrapper_void_ptr,
cleanupWrapper_int_int, cleanupWrapper_void, cleanupWrapper_int_ptr,
};
void cleanupPush_void_ptr_ptr(bool failOnly, void* func, void* arg1, void* arg2) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 0;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Ptr = arg1;
ca->arg2.Ptr = arg2;
}
void cleanupPush_void_ptr_bool(bool failOnly, void* func, void* arg1, bool arg2) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 1;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Ptr = arg1;
ca->arg2.Bool = arg2;
}
void cleanupPush_void_ptr(bool failOnly, void* func, void* arg) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 2;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Ptr = arg;
}
void cleanupPush_int_int(bool failOnly, void* func, int32_t arg) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 3;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Int = arg;
}
void cleanupPush_void(bool failOnly, void* func) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 4;
ca->failOnly = failOnly;
ca->func = func;
}
void cleanupPush_int_ptr(bool failOnly, void* func, void* arg) {
ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction");
SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 5;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Ptr = arg;
}
int32_t cleanupGetActionCount() { return expList->numCleanupAction; }
static void doExecuteCleanup(SExceptionNode* node, int32_t anchor, bool failed) {
while (node->numCleanupAction > anchor) {
--node->numCleanupAction;
SCleanupAction* ca = node->cleanupActions + node->numCleanupAction;
if (failed || !(ca->failOnly)) {
wrappers[ca->wrapper](ca);
}
}
}
void cleanupExecuteTo(int32_t anchor, bool failed) { doExecuteCleanup(expList, anchor, failed); }
void cleanupExecute(SExceptionNode* node, bool failed) { doExecuteCleanup(node, 0, failed); }
bool cleanupExceedLimit() { return expList->numCleanupAction >= expList->maxCleanupAction; }

View File

@ -191,14 +191,12 @@ static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SH
(void)atomic_sub_fetch_16(&pNode->refCount, 1);
if (prev != NULL) {
prev->next = pNewNode;
ASSERT(prev->next != prev);
} else {
pe->next = pNewNode;
}
if (pNode->refCount <= 0) {
pNewNode->next = pNode->next;
ASSERT(pNewNode->next != pNewNode);
FREE_HASH_NODE(pHashObj->freeFp, pNode);
} else {
@ -508,7 +506,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
pe->next = pNode->next;
} else {
prevNode->next = pNode->next;
ASSERT(prevNode->next != prevNode);
}
pe->num--;
@ -759,12 +756,10 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
if (pOld->refCount <= 0) {
if (prevNode) {
prevNode->next = pOld->next;
ASSERT(prevNode->next != prevNode);
} else {
pe->next = pOld->next;
SHashNode *x = pe->next;
if (x != NULL) {
ASSERT(x->next != x);
}
}

View File

@ -2,8 +2,8 @@
#include "tpagedbuf.h"
#include "taoserror.h"
#include "tcompression.h"
#include "tsimplehash.h"
#include "tlog.h"
#include "tsimplehash.h"
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
@ -27,24 +27,24 @@ struct SPageInfo {
};
struct SDiskbasedBuf {
int32_t numOfPages;
int64_t totalBufSize;
uint64_t fileSize; // disk file size
TdFilePtr pFile;
int32_t allocateId; // allocated page id
char* path; // file path
char* prefix; // file name prefix
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SList* freePgList; // free page list
SArray* pIdList; // page id list
SSHashObj*all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
uint64_t nextPos; // next page flush position
int32_t numOfPages;
int64_t totalBufSize;
uint64_t fileSize; // disk file size
TdFilePtr pFile;
int32_t allocateId; // allocated page id
char* path; // file path
char* prefix; // file name prefix
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SList* freePgList; // free page list
SArray* pIdList; // page id list
SSHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
uint64_t nextPos; // next page flush position
char* id; // for debug purpose
bool printStatis; // Print statistics info when closing this buffer.
@ -95,7 +95,8 @@ static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDisk
} else if (*dst < 0) {
return terrno;
}
return code;;
return code;
;
}
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
@ -300,7 +301,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SPageInfo* pageInfo = *(SPageInfo**)pn->data;
SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo);
if (!pageInfo->used) {
break;
@ -435,14 +435,14 @@ static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) {
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1;
bool newPage = false;
bool newPage = false;
char* availablePage = doExtractPage(pBuf, &newPage);
if (availablePage == NULL) {
return NULL;
}
SPageInfo* pi = NULL;
int32_t code = 0;
int32_t code = 0;
if (listNEles(pBuf->freePgList) != 0) {
SListNode* pItem = tdListPopHead(pBuf->freePgList);
pi = *(SPageInfo**)pItem->data;
@ -538,8 +538,6 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
#endif
return (void*)(GET_PAYLOAD_DATA(*pi));
} else { // not in memory
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
bool newPage = false;
(*pi)->pData = doExtractPage(pBuf, &newPage);
@ -700,7 +698,7 @@ void setBufPageDirty(void* pPage, bool dirty) {
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
pBuf->comp = comp;
if (comp && (pBuf->assistBuf == NULL)) {
if (comp && (pBuf->assistBuf == NULL)) {
pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2); // EXTRA BYTES
}
}

View File

@ -263,7 +263,6 @@ static void rbtree_delete_fixup(rbtree_t *rbtree, rbnode_t *child, rbnode_t *chi
child_parent->color = BLACK;
return;
}
ASSERTS(sibling != RBTREE_NULL, "sibling is NULL");
/* get a new sibling, by rotating at sibling. See which child
of sibling is red */
@ -293,11 +292,9 @@ static void rbtree_delete_fixup(rbtree_t *rbtree, rbnode_t *child, rbnode_t *chi
sibling->color = child_parent->color;
child_parent->color = BLACK;
if (child_parent->right == child) {
ASSERTS(sibling->left->color == RED, "slibing->left->color=%d not equal RED", sibling->left->color);
sibling->left->color = BLACK;
rbtree_rotate_right(rbtree, child_parent);
} else {
ASSERTS(sibling->right->color == RED, "slibing->right->color=%d not equal RED", sibling->right->color);
sibling->right->color = BLACK;
rbtree_rotate_left(rbtree, child_parent);
}
@ -320,18 +317,15 @@ static void swap_np(rbnode_t **x, rbnode_t **y) {
/** Update parent pointers of child trees of 'parent' */
static void change_parent_ptr(rbtree_t *rbtree, rbnode_t *parent, rbnode_t *old, rbnode_t *new) {
if (parent == RBTREE_NULL) {
ASSERTS(rbtree->root == old, "root not equal old");
if (rbtree->root == old) rbtree->root = new;
return;
}
ASSERT(parent->left == old || parent->right == old || parent->left == new || parent->right == new);
if (parent->left == old) parent->left = new;
if (parent->right == old) parent->right = new;
}
/** Update parent pointer of a node 'child' */
static void change_child_ptr(rbtree_t *rbtree, rbnode_t *child, rbnode_t *old, rbnode_t *new) {
if (child == RBTREE_NULL) return;
ASSERT(child->parent == old || child->parent == new);
if (child->parent == old) child->parent = new;
}
@ -376,7 +370,6 @@ rbnode_t *rbtree_delete(rbtree_t *rbtree, void *key) {
/* now delete to_delete (which is at the location where the smright previously was) */
}
ASSERT(to_delete->left == RBTREE_NULL || to_delete->right == RBTREE_NULL);
if (to_delete->left != RBTREE_NULL)
child = to_delete->left;

View File

@ -261,8 +261,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
SHNode *pNode = pHashObj->hashList[index];
while (pNode) {
const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
ASSERT(keyLen > 0);
const char *p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) {
break;

View File

@ -366,51 +366,6 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) {
return NULL;
}
#ifdef BUILD_NO_CALL
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) {
return;
}
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1);
int32_t id = 1;
char *prev = NULL;
while (p != pSkipList->pTail) {
char *key = SL_GET_NODE_KEY(pSkipList, p);
if (prev != NULL) {
ASSERT(pSkipList->comparFn(prev, key) < 0);
}
switch (pSkipList->type) {
case TSDB_DATA_TYPE_INT:
fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BIGINT:
fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_GEOMETRY:
fprintf(stdout, "%d: %s \n", id++, key);
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(stdout, "%d: %lf \n", id++, *(double *)key);
break;
default:
fprintf(stdout, "\n");
}
prev = SL_GET_NODE_KEY(pSkipList, p);
p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1);
}
}
#endif
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
for (int32_t i = 0; i < pNode->level; ++i) {
SSkipListNode *x = direction[i];
@ -538,33 +493,6 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
return hasDupKey;
}
#ifdef BUILD_NO_CALL
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) {
int32_t level = pNode->level;
uint8_t dupMode = SL_DUP_MODE(pSkipList);
ASSERT(dupMode != SL_DISCARD_DUP_KEY && dupMode != SL_UPDATE_DUP_KEY);
for (int32_t j = level - 1; j >= 0; --j) {
SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(pNode, j);
SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(pNode, j);
SL_NODE_GET_FORWARD_POINTER(prev, j) = next;
SL_NODE_GET_BACKWARD_POINTER(next, j) = prev;
}
tSkipListFreeNode(pNode);
pSkipList->size--;
}
// Function must be called after calling tSkipListRemoveNodeImpl() function
static void tSkipListCorrectLevel(SSkipList *pSkipList) {
while (pSkipList->level > 0 &&
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) {
pSkipList->level -= 1;
}
}
#endif
UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList,
int32_t level) { // record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE

View File

@ -124,7 +124,6 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
if (split == NULL) {
return NULL;
}
ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t)POINTER_BYTES * size);
}
}