From 83e120ba4885545f850893cbf140ac6d94302632 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Tue, 20 Aug 2024 14:24:00 +0800 Subject: [PATCH 1/7] update rmtaos for client --- packaging/tools/install.sh | 5 ++++ packaging/tools/remove.sh | 46 +++++++++++++++++++++----------- packaging/tools/remove_client.sh | 22 +++++++++++++++ 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index cc3868d2f0..374d17ada6 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -229,6 +229,9 @@ function install_bin() { if [ -d ${script_dir}/${xname}/bin ]; then ${csudo}cp -r ${script_dir}/${xname}/bin/* ${install_main_dir}/bin fi + if [ -e ${script_dir}/${xname}/uninstall.sh ]; then + ${csudo}cp -r ${script_dir}/${xname}/uninstall.sh ${install_main_dir}/uninstall_${xname}.sh + fi fi if [ -f ${script_dir}/bin/quick_deploy.sh ]; then @@ -250,6 +253,8 @@ function install_bin() { for service in "${services[@]}"; do [ -x ${install_main_dir}/bin/${service} ] && ${csudo}ln -sf ${install_main_dir}/bin/${service} ${bin_link_dir}/${service} || : done + + [ ${install_main_dir}/uninstall_${xname}.sh ] && ${csudo}ln -sf ${install_main_dir}/uninstall_${xname}.sh ${bin_link_dir}/uninstall_${xname}.sh || : } function install_lib() { diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 7af64fab1e..88dcbf41f3 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -56,7 +56,11 @@ local_bin_link_dir="/usr/local/bin" service_config_dir="/etc/systemd/system" config_dir="/etc/${PREFIX}" -services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"x" ${PREFIX}"-explorer" ${PREFIX}"keeper") +if [ "${verMode}" == "cluster" ]; then + services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"keeper") +else + services=(${PREFIX}"d" ${PREFIX}"adapter" ${PREFIX}"keeper" ${PREFIX}"-explorer") +fi tools=(${PREFIX} ${PREFIX}"Benchmark" ${PREFIX}"dump" ${PREFIX}"demo" udfd set_core.sh TDinsight.sh $uninstallScript start-all.sh stop-all.sh) csudo="" @@ -222,6 +226,32 @@ function remove_data_and_config() { [ -d "${log_dir}" ] && ${csudo}rm -rf ${log_dir} } +function remove_taosx() { + if [ -e /usr/local/taos/taosx/uninstall.sh ]; then + bash /usr/local/taos/taosx/uninstall.sh + fi +} + +echo +echo "Do you want to remove all the data, log and configuration files? [y/n]" +read answer +if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then + confirmMsg="I confirm that I would like to delete all data, log and configuration files" + echo "Please enter '${confirmMsg}' to continue" + read answer + if [ X"$answer" == X"${confirmMsg}" ]; then + remove_data_and_config + if [ -e /usr/bin/uninstall_${PREFIX}x.sh ]; then + bash /usr/bin/uninstall_${PREFIX}x.sh --clean-all true + fi + else + echo "answer doesn't match, skip this step" + if [ -e /usr/bin/uninstall_${PREFIX}x.sh ]; then + bash /usr/bin/uninstall_${PREFIX}x.sh --clean-all false + fi + fi +fi + remove_bin clean_header # Remove lib file @@ -254,20 +284,6 @@ if [ "$osType" = "Darwin" ]; then ${csudo}rm -rf /Applications/TDengine.app fi -echo -echo "Do you want to remove all the data, log and configuration files? [y/n]" -read answer -if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then - confirmMsg="I confirm that I would like to delete all data, log and configuration files" - echo "Please enter '${confirmMsg}' to continue" - read answer - if [ X"$answer" == X"${confirmMsg}" ]; then - remove_data_and_config - else - echo "answer doesn't match, skip this step" - fi -fi - command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true echo echo "${productName} is removed successfully!" diff --git a/packaging/tools/remove_client.sh b/packaging/tools/remove_client.sh index de14cb38d9..31b1053a42 100755 --- a/packaging/tools/remove_client.sh +++ b/packaging/tools/remove_client.sh @@ -31,6 +31,8 @@ bin_link_dir="/usr/bin" lib_link_dir="/usr/lib" lib64_link_dir="/usr/lib64" inc_link_dir="/usr/include" +log_dir="/var/log/${clientName2}" +cfg_dir="/etc/${clientName2}" csudo="" if command -v sudo > /dev/null; then @@ -92,6 +94,24 @@ function clean_log() { ${csudo}rm -rf ${log_link_dir} || : } +function clean_config_and_log_dir() { + # Remove link + echo "Do you want to remove all the log and configuration files? [y/n]" + read answer + if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then + confirmMsg="I confirm that I would like to delete all log and configuration files" + echo "Please enter '${confirmMsg}' to continue" + read answer + if [ X"$answer" == X"${confirmMsg}" ]; then + # Remove dir + rm -rf ${cfg_dir} || : + rm -rf ${log_dir} || : + else + echo "answer doesn't match, skip this step" + fi + fi +} + # Stop client. kill_client # Remove binary file and links @@ -104,6 +124,8 @@ clean_lib clean_log # Remove link configuration file clean_config +# Remove dir +clean_config_and_log_dir ${csudo}rm -rf ${install_main_dir} From 84bb96cf6501e629693aa547a6c957995dacfc51 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 22 Aug 2024 03:54:01 +0000 Subject: [PATCH 2/7] fix/TD-31542 --- source/dnode/mnode/impl/src/mndStb.c | 2 +- source/libs/sync/src/syncAppendEntriesReply.c | 4 +- source/libs/sync/src/syncCommit.c | 5 +- source/libs/sync/src/syncElection.c | 2 +- source/libs/sync/src/syncMain.c | 36 +++-- source/libs/sync/src/syncPipeline.c | 127 ++++++++++++------ source/libs/sync/src/syncRaftEntry.c | 5 +- source/libs/sync/src/syncRaftLog.c | 15 ++- source/libs/sync/src/syncRequestVote.c | 4 +- source/libs/sync/src/syncRequestVoteReply.c | 2 +- source/libs/sync/src/syncSnapshot.c | 35 +++-- 11 files changed, 162 insertions(+), 75 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a76dfa1b51..9a1804ce6a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1774,7 +1774,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj * int32_t nCols) { // if (pColCmpr == NULL || colName == NULL) return -1; - ASSERT(taosArrayGetSize(pField) == nCols); + if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED; TAOS_FIELD *p = taosArrayGet(pField, 0); int32_t code = 0; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 6ea5402c28..005cf4337d 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -57,14 +57,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { - if (pMsg->term > raftStoreGetTerm(ths)) { + if (pMsg->term != raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return TSDB_CODE_SYN_WRONG_TERM; } - ASSERT(pMsg->term == raftStoreGetTerm(ths)); - sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3e2b98c35b..1c129a0ed1 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -57,7 +57,10 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->qu bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { int count = 0; SSyncIndexMgr* pMatches = pNode->pMatchIndex; - ASSERT(pNode->replicaNum == pMatches->replicaNum); + if (pNode->replicaNum != pMatches->replicaNum) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return false; + }; for (int i = 0; i < pNode->totalReplicaNum; i++) { if(pNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_VOTER){ diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 3b595f464d..fc056c9eba 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -119,7 +119,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { } ret = syncNodeRequestVotePeers(pSyncNode); - ASSERT(ret == 0); + if (ret != 0) return ret; syncNodeResetElectTimer(pSyncNode); return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ffd180ee01..2f5a3488a1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -988,9 +988,18 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { int32_t code = 0; - ASSERTS(pNode->pLogStore != NULL, "log store not created"); - ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); - ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); + if (pNode->pLogStore == NULL) { + sError("vgId:%d, log store not created", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm == NULL) { + sError("vgId:%d, pFsm not registered", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm->FpGetSnapshotInfo == NULL) { + sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } SSnapshot snapshot = {0}; // TODO check return value (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); @@ -1384,8 +1393,14 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { int32_t syncNodeRestore(SSyncNode* pSyncNode) { int32_t code = 0; - ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); - ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); + if (pSyncNode->pLogStore == NULL) { + sError("vgId:%d, log store not created", pSyncNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pSyncNode->pLogBuf == NULL) { + sError("vgId:%d, ring log buffer not created", pSyncNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1400,7 +1415,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { TAOS_RETURN(code); } - ASSERT(endIndex == lastVer + 1); + if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); @@ -1743,7 +1758,10 @@ inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) { } } - ASSERT(b1 == b2); + if (b1 != b2) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return false; + } return b1; } @@ -2194,7 +2212,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { } SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - // ASSERT(lastIndex >= 0); + sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); } @@ -2222,7 +2240,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { } int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); + if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR; syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); sNTrace(pSyncNode, "assigned leader to leader"); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 782d97f789..22947bdd8e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -71,16 +71,32 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _err; } - ASSERT(index == pBuf->endIndex); + if (index != pBuf->endIndex) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + }; SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; - ASSERT(pExist == NULL); + if (pExist != NULL) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } // initial log buffer with at least one item, e.g. commitIndex SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERTS(pMatch != NULL, "no matched log entry"); - ASSERT(pMatch->index + 1 == index); - ASSERT(pMatch->term <= pEntry->term); + if (pMatch == NULL) { + sError("vgId:%d, no matched log entry", pNode->vgId); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } + if (pMatch->index + 1 != index) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } + if (!(pMatch->term <= pEntry->term)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; @@ -114,7 +130,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } - ASSERT(index - 1 == prevIndex); + if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; if (prevIndex >= pBuf->startIndex) { pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; @@ -178,9 +194,18 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex } int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - ASSERTS(pNode->pLogStore != NULL, "log store not created"); - ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); - ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); + if (pNode->pLogStore == NULL) { + sError("log store not created"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm == NULL) { + sError("pFsm not registered"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm->FpGetSnapshotInfo == NULL) { + sError("FpGetSnapshotInfo not registered"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } int32_t code = 0, lino = 0; SSnapshot snapshot = {0}; @@ -191,7 +216,8 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex)); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(lastVer >= commitIndex); + if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; + ; SyncIndex toIndex = lastVer; // update match index pBuf->commitIndex = commitIndex; @@ -239,7 +265,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // put a dummy record at commitIndex if present in log buffer if (takeDummy) { - ASSERT(index == pBuf->commitIndex); + if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); if (pDummy == NULL) { @@ -392,9 +418,18 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } // update - ASSERT(pBuf->startIndex < index); - ASSERT(index - pBuf->startIndex < pBuf->size); - ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); + if (!(pBuf->startIndex < index)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + }; + if (!(index - pBuf->startIndex < pBuf->size)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (pBuf->entries[index % pBuf->size].pItem != NULL) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; @@ -422,14 +457,14 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) { int32_t code = 0; - ASSERT(pEntry->index >= 0); + if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR; SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) { sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index); TAOS_RETURN(code); } lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer + 1); + if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) { @@ -439,7 +474,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf } lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer); + if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR; return 0; } @@ -718,7 +753,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); + if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; syncEntryDestroy(pEntry); (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); pBuf->startIndex = index + 1; @@ -786,7 +821,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { int64_t pos = index % pMgr->size; - ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); + if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { break; @@ -809,7 +847,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { tstrerror(code), index, pDestId->addr); goto _out; } - ASSERT(barrier == pMgr->states[pos].barrier); + if (barrier != pMgr->states[pos].barrier) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].term = term; pMgr->states[pos].acked = false; @@ -839,11 +880,11 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; int32_t code = 0; - ASSERT(pMgr->restored == false); + if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMgr->endIndex == 0) { - ASSERT(pMgr->startIndex == 0); - ASSERT(pMgr->matchIndex == 0); + if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR; + if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMsg->matchIndex < 0) { pMgr->restored = true; sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 @@ -909,7 +950,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { - ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR; if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); TAOS_RETURN(code); @@ -918,13 +959,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn return 0; } - ASSERT(index + 1 >= firstVer); + if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (term == pMsg->lastMatchTerm) { index = index + 1; - ASSERT(index <= pNode->pLogBuf->matchIndex); + if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR; } else { - ASSERT(index > firstVer); + if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR; } } @@ -975,8 +1016,8 @@ int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { - ASSERT(!pMgr->restored); - ASSERT(pMgr->startIndex >= 0); + if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR; + if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); int64_t nowMs = taosGetMonoTimestampMs(); int32_t code = 0; @@ -996,7 +1037,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde TAOS_RETURN(code); } - ASSERT(index >= 0); + if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; pMgr->states[index % pMgr->size].barrier = barrier; pMgr->states[index % pMgr->size].timeMs = nowMs; pMgr->states[index % pMgr->size].term = term; @@ -1014,7 +1055,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde } int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - ASSERT(pMgr->restored); + if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); @@ -1070,7 +1111,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { - ASSERT(pMgr->restored == true); + if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; @@ -1100,7 +1141,10 @@ SSyncLogReplMgr* syncLogReplCreate() { pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]); - ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE); + if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } return pMgr; } @@ -1115,7 +1159,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) { int32_t syncNodeLogReplInit(SSyncNode* pNode) { for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) { - ASSERT(pNode->logReplMgrs[i] == NULL); + if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; pNode->logReplMgrs[i] = syncLogReplCreate(); if (pNode->logReplMgrs[i] == NULL) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); @@ -1141,7 +1185,10 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) { pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); - ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); + if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _exit; + } if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -1194,7 +1241,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { int32_t code = 0; - ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (toIndex == pBuf->endIndex) { return 0; @@ -1217,7 +1264,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex } pBuf->endIndex = toIndex; pBuf->matchIndex = TMIN(pBuf->matchIndex, index); - ASSERT(index + 1 == toIndex); + if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; // trunc wal SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); @@ -1227,7 +1274,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex TAOS_RETURN(code); } lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(toIndex == lastVer + 1); + if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; // refill buffer on need if (toIndex <= pBuf->startIndex) { @@ -1237,7 +1284,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex } } - ASSERT(pBuf->endIndex == toIndex); + if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)syncLogBufferValidate(pBuf); return 0; } @@ -1246,7 +1293,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)taosThreadMutexLock(&pBuf->mutex); (void)syncLogBufferValidate(pBuf); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(lastVer == pBuf->matchIndex); + if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; SyncIndex index = pBuf->endIndex - 1; (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index fd6781f354..56a702a9d5 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -70,7 +70,10 @@ SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) { return NULL; } memcpy(pEntry, pMsg->data, pMsg->dataLen); - ASSERT(pEntry->bytes == pMsg->dataLen); + if (pEntry->bytes != pMsg->dataLen) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } return pEntry; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index ecca777806..e7ecb67e69 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -61,7 +61,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; pData->pWal = pSyncNode->pWal; - ASSERT(pData->pWal != NULL); + if (pData->pWal == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } (void)taosThreadMutexInit(&(pData->mutex), NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); @@ -115,7 +118,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) { // log[m .. n] static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { - ASSERT(snapshotIndex >= 0); + if (!(snapshotIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -303,14 +306,14 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR } *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); - ASSERT(*ppEntry != NULL); + if (*ppEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST; (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType; (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term; (*ppEntry)->index = index; - ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); + if ((*ppEntry)->dataLen != pWalHandle->pHead->head.bodyLen) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); /* @@ -362,14 +365,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - ASSERT(ppLastEntry != NULL); + if (ppLastEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; *ppLastEntry = NULL; if (walIsEmpty(pWal)) { TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); - ASSERT(lastIndex >= SYNC_INDEX_BEGIN); + if (!(lastIndex >= SYNC_INDEX_BEGIN)) return TSDB_CODE_SYN_INTERNAL_ERROR; int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); TAOS_RETURN(code); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index a30b9a4930..8b8b2580b1 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -104,7 +104,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, pMsg->term); } SyncTerm currentTerm = raftStoreGetTerm(ths); - ASSERT(pMsg->term <= currentTerm); + if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR; bool grant = (pMsg->term == currentTerm) && logOK && ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); @@ -130,7 +130,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pReply->destId = pMsg->srcId; pReply->term = currentTerm; pReply->voteGranted = grant; - ASSERT(!grant || pMsg->term == pReply->term); + if (!(!grant || pMsg->term == pReply->term)) return TSDB_CODE_SYN_INTERNAL_ERROR; // trace log syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 10d9a6c96b..eeddb708ab 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -64,7 +64,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == currentTerm); + if (pMsg->term != currentTerm) return TSDB_CODE_SYN_INTERNAL_ERROR; // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a2c53997b..2a3b164f03 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -57,7 +57,7 @@ static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pBuf->size = sizeof(pBuf->entries) / sizeof(void *); - ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE); + if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)taosThreadMutexInit(&pBuf->mutex, NULL); *ppBuf = pBuf; TAOS_RETURN(0); @@ -311,7 +311,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { } } - ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END); + if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _OUT; + } // send msg int32_t blockLen = (pBlk) ? pBlk->blockLen : 0; @@ -323,7 +326,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // put in buffer int64_t nowMs = taosGetTimestampMs(); if (pBlk) { - ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END); + if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _OUT; + } pBlk->sendTimeMs = nowMs; pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk; pBlk = NULL; @@ -351,7 +357,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; - ASSERT(pBlk); + if (!pBlk) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } int64_t nowMs = taosGetTimestampMs(); if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { continue; @@ -682,7 +691,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, SSnapshot *pInfo) { - ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT); + if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR; int32_t code = 0, lino = 0; // copy snap info from leader @@ -878,7 +887,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot goto _out; } - ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); + if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMsg->seq > pRcvBuf->cursor) { if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) { @@ -922,7 +931,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend // condition 4 // transfering SyncSnapshotSend *pMsg = ppMsg[0]; - ASSERT(pMsg); + if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; @@ -1071,7 +1080,7 @@ _out:; } static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY); + if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncTLV *datHead = (void *)pMsg->data; if (datHead->typ != pMsg->payloadType) { @@ -1168,11 +1177,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp goto _out; } - ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end); + if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) { SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size]; - ASSERT(pBlk); + if (!pBlk) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } pBlk->acked = 1; } From a092582a0cc63632b5430113fd34bdba3fce622c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 22 Aug 2024 14:14:44 +0800 Subject: [PATCH 3/7] enh: clear useless asserts --- include/util/texception.h | 129 --- source/dnode/vnode/src/meta/metaCache.c | 1 - source/dnode/vnode/src/meta/metaQuery.c | 8 +- source/util/src/tarray.c | 29 +- source/util/src/tbloomfilter.c | 4 +- source/util/src/tcompression.c | 1168 +---------------------- source/util/src/tconfig.c | 4 +- source/util/src/tdigest.c | 4 - source/util/src/tencode.c | 2 - source/util/src/texception.c | 150 --- source/util/src/thash.c | 5 - source/util/src/tpagedbuf.c | 50 +- source/util/src/trbtree.c | 7 - source/util/src/tsimplehash.c | 3 +- source/util/src/tskiplist.c | 72 -- source/util/src/tutil.c | 1 - 16 files changed, 53 insertions(+), 1584 deletions(-) delete mode 100644 include/util/texception.h delete mode 100644 source/util/src/texception.c diff --git a/include/util/texception.h b/include/util/texception.h deleted file mode 100644 index 576545d96c..0000000000 --- a/include/util/texception.h +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2020 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_UTIL_EXCEPTION_H_ -#define _TD_UTIL_EXCEPTION_H_ - -#include "os.h" - -#ifdef __cplusplus -extern "C" { -#endif - -/* - * cleanup actions - */ -typedef struct SCleanupAction { - bool failOnly; - uint8_t wrapper; - uint16_t reserved; - void* func; - union { - void* Ptr; - bool Bool; - char Char; - int8_t Int8; - uint8_t Uint8; - int16_t Int16; - uint16_t Uint16; - int32_t Int; - uint32_t Uint; - int32_t Int32; - uint32_t Uint32; - int64_t Int64; - uint64_t Uint64; - float Float; - double Double; - } arg1, arg2; -} SCleanupAction; - -/* - * exception hander registration - */ -typedef struct SExceptionNode { - struct SExceptionNode* prev; - jmp_buf jb; - int32_t code; - int32_t maxCleanupAction; - int32_t numCleanupAction; - SCleanupAction* cleanupActions; -} SExceptionNode; - -// functions & macros for auto-cleanup - -void cleanupPush_void_ptr_ptr(bool failOnly, void* func, void* arg1, void* arg2); -void cleanupPush_void_ptr_bool(bool failOnly, void* func, void* arg1, bool arg2); -void cleanupPush_void_ptr(bool failOnly, void* func, void* arg); -void cleanupPush_int_int(bool failOnly, void* func, int32_t arg); -void cleanupPush_void(bool failOnly, void* func); -void cleanupPush_int_ptr(bool failOnly, void* func, void* arg); - -int32_t cleanupGetActionCount(); -void cleanupExecuteTo(int32_t anchor, bool failed); -void cleanupExecute(SExceptionNode* node, bool failed); -bool cleanupExceedLimit(); - -#define CLEANUP_PUSH_VOID_PTR_PTR(failOnly, func, arg1, arg2) \ - cleanupPush_void_ptr_ptr((failOnly), (void*)(func), (void*)(arg1), (void*)(arg2)) -#define CLEANUP_PUSH_VOID_PTR_BOOL(failOnly, func, arg1, arg2) \ - cleanupPush_void_ptr_bool((failOnly), (void*)(func), (void*)(arg1), (bool)(arg2)) -#define CLEANUP_PUSH_VOID_PTR(failOnly, func, arg) cleanupPush_void_ptr((failOnly), (void*)(func), (void*)(arg)) -#define CLEANUP_PUSH_INT_INT(failOnly, func, arg) cleanupPush_void_ptr((failOnly), (void*)(func), (int32_t)(arg)) -#define CLEANUP_PUSH_VOID(failOnly, func) cleanupPush_void((failOnly), (void*)(func)) -#define CLEANUP_PUSH_INT_PTR(failOnly, func, arg) cleanupPush_int_ptr((failOnly), (void*)(func), (void*)(arg)) -#define CLEANUP_PUSH_FREE(failOnly, arg) cleanupPush_void_ptr((failOnly), free, (void*)(arg)) -#define CLEANUP_PUSH_CLOSE(failOnly, arg) cleanupPush_int_int((failOnly), close, (int32_t)(arg)) -#define CLEANUP_PUSH_FCLOSE(failOnly, arg) cleanupPush_int_ptr((failOnly), fclose, (void*)(arg)) - -#define CLEANUP_GET_ANCHOR() cleanupGetActionCount() -#define CLEANUP_EXECUTE_TO(anchor, failed) cleanupExecuteTo((anchor), (failed)) -#define CLEANUP_EXCEED_LIMIT() cleanupExceedLimit() - -// functions & macros for exception handling - -void exceptionPushNode(SExceptionNode* node); -int32_t exceptionPopNode(); -void exceptionThrow(int32_t code); - -#define TRY(maxCleanupActions) \ - do { \ - SExceptionNode exceptionNode = {0}; \ - SCleanupAction cleanupActions[(maxCleanupActions) > 0 ? (maxCleanupActions) : 1]; \ - exceptionNode.maxCleanupAction = (maxCleanupActions) > 0 ? (maxCleanupActions) : 1; \ - exceptionNode.cleanupActions = cleanupActions; \ - exceptionPushNode(&exceptionNode); \ - int32_t caughtException = setjmp(exceptionNode.jb); \ - if (caughtException == 0) - -#define CATCH(code) \ - int32_t code = exceptionPopNode(); \ - if (caughtException == 1) - -#define FINALLY(code) int32_t code = exceptionPopNode(); - -#define END_TRY \ - } \ - while (0) \ - ; - -#define THROW(x) exceptionThrow((x)) -#define CAUGHT_EXCEPTION() ((bool)(caughtException == 1)) -#define CLEANUP_EXECUTE() cleanupExecute(&exceptionNode, CAUGHT_EXCEPTION()) - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_UTIL_EXCEPTION_H_*/ diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 7577b9ec0c..5cab16c63c 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -512,7 +512,6 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, buf[0] = (uint64_t)pHashMap; buf[1] = suid; setMD5DigestInKey(buf, key, keyLen); - ASSERT(keyLen == sizeof(uint64_t) * 2); } int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 45f9baf6fd..1668f699ae 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -275,10 +275,10 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) { int32_t metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first, int8_t move) { int32_t code = 0; int32_t lino; - int8_t locked = 0; + int8_t locked = 0; if (pTbCur->paused) { metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK); - locked = 1; + locked = 1; code = tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); if (code != 0) { TSDB_CHECK_CODE(code, lino, _exit); @@ -673,7 +673,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv } } - if (ASSERTS(sver > 0, "failed to get table schema version: %d", sver)) { + if (!(sver > 0)) { code = TSDB_CODE_NOT_FOUND; goto _exit; } @@ -1608,8 +1608,6 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t metaULock(pVnodeObj->pMeta); if (numOfTables) *numOfTables = state.ctbNum; if (numOfCols) *numOfCols = state.colNum; - ASSERTS(state.colNum > 0, "vgId:%d, suid:%" PRIi64 " nCols:%d <= 0 in metaCache", TD_VID(pVnodeObj), uid, - state.colNum); goto _exit; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 37667e2975..59505dc0c1 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -266,8 +266,9 @@ void* taosArrayInsert(SArray* pArray, size_t index, const void* pData) { } void taosArraySet(SArray* pArray, size_t index, void* pData) { - ASSERT(index < pArray->size); - memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); + if (index < pArray->size) { + memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); + } } void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { @@ -291,7 +292,9 @@ void taosArrayPopTailBatch(SArray* pArray, size_t cnt) { } void taosArrayRemove(SArray* pArray, size_t index) { - ASSERT(index < pArray->size); + if (!(index < pArray->size)) { + return; + } if (index == pArray->size - 1) { (void)taosArrayPop(pArray); @@ -305,17 +308,17 @@ void taosArrayRemove(SArray* pArray, size_t index) { } void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp) { - ASSERT(index + num <= pArray->size); - - if (fp) { - for (int32_t i = 0; i < num; i++) { - fp(taosArrayGet(pArray, index + i)); + if (index + num <= pArray->size) { + if (fp) { + for (int32_t i = 0; i < num; i++) { + fp(taosArrayGet(pArray, index + i)); + } } - } - memmove((char*)pArray->pData + index * pArray->elemSize, (char*)pArray->pData + (index + num) * pArray->elemSize, - (pArray->size - index - num) * pArray->elemSize); - pArray->size -= num; + memmove((char*)pArray->pData + index * pArray->elemSize, (char*)pArray->pData + (index + num) * pArray->elemSize, + (pArray->size - index - num) * pArray->elemSize); + pArray->size -= num; + } } SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { @@ -349,8 +352,6 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { if (fn == NULL) { memcpy(dst->pData, pSrc->pData, pSrc->elemSize * pSrc->size); } else { - ASSERT(pSrc->elemSize == sizeof(void*)); - for (int32_t i = 0; i < pSrc->size; ++i) { void* p = fn(taosArrayGetP(pSrc, i)); memcpy(((char*)dst->pData) + i * dst->elemSize, &p, dst->elemSize); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index b20fb4bf39..2108389aec 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -78,7 +78,9 @@ _error: } int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) { - ASSERT(!tBloomFilterIsFull(pBF)); + if (tBloomFilterIsFull(pBF)) { + return TSDB_CODE_FAILED; + } bool hasChange = false; const register uint64_t size = pBF->numBits; uint64_t cbHash = hash1; diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 8402d2a658..afbd5304ce 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -718,7 +718,9 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, int32_t _pos = 1; int32_t longBytes = LONG_BYTES; - ASSERTS(nelements >= 0, "nelements is negative"); + if (nelements < 0) { + return -1; + } if (nelements == 0) return 0; @@ -815,7 +817,7 @@ _exit_over: int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { int64_t longBytes = LONG_BYTES; - ASSERTS(nelements >= 0, "nelements is negative"); + if (nelements < 0) return -1; if (nelements == 0) return 0; if (input[0] == 0) { @@ -1278,1151 +1280,6 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output); } -#ifdef BUILD_NO_CALL -/************************************************************************* - * STREAM COMPRESSION - *************************************************************************/ -#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (a)) || ((a) < 0 && (b) >= INT64_MIN - (a))) - -static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData); -static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); - -static FORCE_INLINE int64_t tGetI64OfI8(const void *pData) { return *(int8_t *)pData; } -static FORCE_INLINE int64_t tGetI64OfI16(const void *pData) { return *(int16_t *)pData; } -static FORCE_INLINE int64_t tGetI64OfI32(const void *pData) { return *(int32_t *)pData; } -static FORCE_INLINE int64_t tGetI64OfI64(const void *pData) { return *(int64_t *)pData; } - -static FORCE_INLINE void tPutI64OfI8(int64_t v, void *pData) { *(int8_t *)pData = v; } -static FORCE_INLINE void tPutI64OfI16(int64_t v, void *pData) { *(int16_t *)pData = v; } -static FORCE_INLINE void tPutI64OfI32(int64_t v, void *pData) { *(int32_t *)pData = v; } -static FORCE_INLINE void tPutI64OfI64(int64_t v, void *pData) { *(int64_t *)pData = v; } - -static struct { - int8_t type; - int32_t bytes; - int8_t isVarLen; - int32_t (*startFn)(SCompressor *, int8_t type, int8_t cmprAlg); - int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); - int32_t (*endFn)(SCompressor *, const uint8_t **, int32_t *); - int64_t (*getI64)(const void *pData); - void (*putI64)(int64_t v, void *pData); -} DATA_TYPE_INFO[] = { - {.type = TSDB_DATA_TYPE_NULL, - .bytes = 0, - .isVarLen = 0, - .startFn = NULL, - .cmprFn = NULL, - .endFn = NULL, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_BOOL, - .bytes = 1, - .isVarLen = 0, - .startFn = tCompBoolStart, - .cmprFn = tCompBool, - .endFn = tCompBoolEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_TINYINT, - .bytes = 1, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI8, - .putI64 = tPutI64OfI8}, - {.type = TSDB_DATA_TYPE_SMALLINT, - .bytes = 2, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI16, - .putI64 = tPutI64OfI16}, - {.type = TSDB_DATA_TYPE_INT, - .bytes = 4, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI32, - .putI64 = tPutI64OfI32}, - {.type = TSDB_DATA_TYPE_BIGINT, - .bytes = 8, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI64, - .putI64 = tPutI64OfI64}, - {.type = TSDB_DATA_TYPE_FLOAT, - .bytes = 4, - .isVarLen = 0, - .startFn = tCompFloatStart, - .cmprFn = tCompFloat, - .endFn = tCompFloatEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_DOUBLE, - .bytes = 8, - .isVarLen = 0, - .startFn = tCompDoubleStart, - .cmprFn = tCompDouble, - .endFn = tCompDoubleEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_VARCHAR, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_TIMESTAMP, - .bytes = 8, - .isVarLen = 0, - .startFn = tCompTimestampStart, - .cmprFn = tCompTimestamp, - .endFn = tCompTimestampEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_NCHAR, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_UTINYINT, - .bytes = 1, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI8, - .putI64 = tPutI64OfI8}, - {.type = TSDB_DATA_TYPE_USMALLINT, - .bytes = 2, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI16, - .putI64 = tPutI64OfI16}, - {.type = TSDB_DATA_TYPE_UINT, - .bytes = 4, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI32, - .putI64 = tPutI64OfI32}, - {.type = TSDB_DATA_TYPE_UBIGINT, - .bytes = 8, - .isVarLen = 0, - .startFn = tCompIntStart, - .cmprFn = tCompInt, - .endFn = tCompIntEnd, - .getI64 = tGetI64OfI64, - .putI64 = tPutI64OfI64}, - {.type = TSDB_DATA_TYPE_JSON, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_VARBINARY, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_DECIMAL, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_BLOB, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_MEDIUMBLOB, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, - {.type = TSDB_DATA_TYPE_GEOMETRY, - .bytes = 1, - .isVarLen = 1, - .startFn = tCompBinaryStart, - .cmprFn = tCompBinary, - .endFn = tCompBinaryEnd, - .getI64 = NULL, - .putI64 = NULL}, -}; - -struct SCompressor { - int8_t type; - int8_t cmprAlg; - int8_t autoAlloc; - int32_t nVal; - uint8_t *pBuf; - int32_t nBuf; - uint8_t *aBuf[1]; - union { - // Timestamp ---- - struct { - int64_t ts_prev_val; - int64_t ts_prev_delta; - uint8_t *ts_flag_p; - }; - // Integer ---- - struct { - int64_t i_prev; - int32_t i_selector; - int32_t i_start; - int32_t i_end; - int32_t i_nEle; - uint64_t i_aZigzag[241]; - int8_t i_aBitN[241]; - }; - // Float ---- - struct { - uint32_t f_prev; - uint8_t *f_flag_p; - }; - // Double ---- - struct { - uint64_t d_prev; - uint8_t *d_flag_p; - }; - }; -}; - -static int32_t tTwoStageComp(SCompressor *pCmprsor, int32_t *szComp) { - int32_t code = 0; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { - return code; - } - - *szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); - if (*szComp && *szComp < pCmprsor->nBuf) { - pCmprsor->aBuf[0][0] = 1; - *szComp += 1; - } else { - pCmprsor->aBuf[0][0] = 0; - memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); - *szComp = pCmprsor->nBuf + 1; - } - - return code; -} - -// Timestamp ===================================================== -static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->nBuf = 1; - - code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); - if (code) return code; - - pCmprsor->pBuf[0] = 1; - - return code; -} - -static int32_t tCompTSSwitchToCopy(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor->nVal == 0) goto _exit; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(int64_t) * pCmprsor->nVal + 1))) { - return code; - } - - int32_t n = 1; - int32_t nBuf = 1; - int64_t value; - int64_t delta; - for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { - uint8_t aN[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; - - n++; - - for (int32_t i = 0; i < 2; i++) { - uint64_t vZigzag = 0; - for (uint8_t j = 0; j < aN[i]; j++) { - vZigzag |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j)); - n++; - } - - int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - if (iVal) { - delta = delta_of_delta + delta; - value = delta + value; - } else { - delta = 0; - value = delta_of_delta; - } - - memcpy(pCmprsor->aBuf[0] + nBuf, &value, sizeof(value)); - nBuf += sizeof(int64_t); - - iVal++; - if (iVal >= pCmprsor->nVal) break; - } - } - - ASSERT(n == pCmprsor->nBuf && nBuf == sizeof(int64_t) * pCmprsor->nVal + 1); - - uint8_t *pBuf = pCmprsor->pBuf; - pCmprsor->pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pBuf; - pCmprsor->nBuf = nBuf; - -_exit: - pCmprsor->pBuf[0] = 0; - return code; -} -static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - int64_t ts = *(int64_t *)pData; - ASSERT(nData == 8); - - if (pCmprsor->pBuf[0] == 1) { - if (pCmprsor->nVal == 0) { - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = -ts; - } - - if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { - code = tCompTSSwitchToCopy(pCmprsor); - if (code) return code; - goto _copy_cmpr; - } - int64_t delta = ts - pCmprsor->ts_prev_val; - - if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { - code = tCompTSSwitchToCopy(pCmprsor); - if (code) return code; - goto _copy_cmpr; - } - int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; - uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta); - - pCmprsor->ts_prev_val = ts; - pCmprsor->ts_prev_delta = delta; - - if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) { - return code; - } - - pCmprsor->ts_flag_p = pCmprsor->pBuf + pCmprsor->nBuf; - pCmprsor->nBuf++; - pCmprsor->ts_flag_p[0] = 0; - while (vZigzag) { - pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff); - pCmprsor->nBuf++; - pCmprsor->ts_flag_p[0]++; - vZigzag >>= 8; - } - } else { - while (vZigzag) { - pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff); - pCmprsor->nBuf++; - pCmprsor->ts_flag_p[0] += 0x10; - vZigzag >>= 8; - } - } - } else { - _copy_cmpr: - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(ts)))) { - return code; - } - - memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &ts, sizeof(ts)); - pCmprsor->nBuf += sizeof(ts); - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - if (pCmprsor->nBuf >= sizeof(int64_t) * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 1) { - code = tCompTSSwitchToCopy(pCmprsor); - if (code) return code; - } - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { - code = tTwoStageComp(pCmprsor, nData); - if (code) return code; - *ppData = pCmprsor->aBuf[0]; - } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } else { - ASSERT(0); - } - - return code; -} - -// Integer ===================================================== -#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) -static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; -static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; -static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, - 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, - 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, - 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; -static const int32_t NEXT_IDX[] = { - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, - 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, - 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, - 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, - 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, - 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, - 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, - 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, - 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, - 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, - 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 0}; - -static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->i_prev = 0; - pCmprsor->i_selector = 0; - pCmprsor->i_start = 0; - pCmprsor->i_end = 0; - pCmprsor->i_nEle = 0; - pCmprsor->nBuf = 1; - - code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); - if (code) return code; - - pCmprsor->pBuf[0] = 0; - - return code; -} - -static int32_t tCompIntSwitchToCopy(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor->nVal == 0) goto _exit; - - int32_t size = DATA_TYPE_INFO[pCmprsor->type].bytes * pCmprsor->nVal + 1; - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], size))) { - return code; - } - - int32_t n = 1; - int32_t nBuf = 1; - int64_t vPrev = 0; - while (n < pCmprsor->nBuf) { - uint64_t b; - memcpy(&b, pCmprsor->pBuf + n, sizeof(b)); - n += sizeof(b); - - int32_t i_selector = (b & 0xf); - int32_t nEle = SELECTOR_TO_ELEMS[i_selector]; - uint8_t bits = BIT_PER_INTEGER[i_selector]; - uint64_t mask = (((uint64_t)1) << bits) - 1; - for (int32_t iEle = 0; iEle < nEle; iEle++) { - uint64_t vZigzag = (b >> (bits * iEle + 4)) & mask; - vPrev = ZIGZAG_DECODE(int64_t, vZigzag) + vPrev; - - DATA_TYPE_INFO[pCmprsor->type].putI64(vPrev, pCmprsor->aBuf[0] + nBuf); - nBuf += DATA_TYPE_INFO[pCmprsor->type].bytes; - } - } - - while (pCmprsor->i_nEle) { - vPrev = ZIGZAG_DECODE(int64_t, pCmprsor->i_aZigzag[pCmprsor->i_start]) + vPrev; - - memcpy(pCmprsor->aBuf[0] + nBuf, &vPrev, DATA_TYPE_INFO[pCmprsor->type].bytes); - nBuf += DATA_TYPE_INFO[pCmprsor->type].bytes; - - pCmprsor->i_start = NEXT_IDX[pCmprsor->i_start]; - pCmprsor->i_nEle--; - } - - ASSERT(n == pCmprsor->nBuf && nBuf == size); - - uint8_t *pBuf = pCmprsor->pBuf; - pCmprsor->pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pBuf; - pCmprsor->nBuf = size; - -_exit: - pCmprsor->pBuf[0] = 1; - return code; -} - -static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes); - - if (pCmprsor->pBuf[0] == 0) { - int64_t val = DATA_TYPE_INFO[pCmprsor->type].getI64(pData); - - if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) { - code = tCompIntSwitchToCopy(pCmprsor); - if (code) return code; - goto _copy_cmpr; - } - - int64_t diff = val - pCmprsor->i_prev; - uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); - if (vZigzag >= SIMPLE8B_MAX) { - code = tCompIntSwitchToCopy(pCmprsor); - if (code) return code; - goto _copy_cmpr; - } - - int8_t nBit = (vZigzag) ? (64 - BUILDIN_CLZL(vZigzag)) : 0; - pCmprsor->i_prev = val; - - for (;;) { - if (pCmprsor->i_nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && - pCmprsor->i_nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) { - if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) { - pCmprsor->i_selector = BIT_TO_SELECTOR[nBit]; - } - pCmprsor->i_aZigzag[pCmprsor->i_end] = vZigzag; - pCmprsor->i_aBitN[pCmprsor->i_end] = nBit; - pCmprsor->i_end = NEXT_IDX[pCmprsor->i_end]; - pCmprsor->i_nEle++; - break; - } else { - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - int32_t lidx = pCmprsor->i_selector + 1; - int32_t ridx = 15; - while (lidx <= ridx) { - pCmprsor->i_selector = (lidx + ridx) >> 1; - - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - lidx = pCmprsor->i_selector + 1; - } else if (pCmprsor->i_nEle > SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - ridx = pCmprsor->i_selector - 1; - } else { - break; - } - } - - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) pCmprsor->i_selector++; - } - int32_t nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(uint64_t)))) { - return code; - } - - uint64_t *bp = (uint64_t *)(pCmprsor->pBuf + pCmprsor->nBuf); - pCmprsor->nBuf += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector]; - for (int32_t iVal = 0; iVal < nEle; iVal++) { - bp[0] |= (pCmprsor->i_aZigzag[pCmprsor->i_start] << (bits * iVal + 4)); - pCmprsor->i_start = NEXT_IDX[pCmprsor->i_start]; - pCmprsor->i_nEle--; - } - - // reset and continue - pCmprsor->i_selector = 0; - for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal = NEXT_IDX[iVal]) { - if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) { - pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]; - } - } - } - } - } else { - _copy_cmpr: - code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData); - if (code) return code; - - memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData); - pCmprsor->nBuf += nData; - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - for (; pCmprsor->i_nEle;) { - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - int32_t lidx = pCmprsor->i_selector + 1; - int32_t ridx = 15; - while (lidx <= ridx) { - pCmprsor->i_selector = (lidx + ridx) >> 1; - - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - lidx = pCmprsor->i_selector + 1; - } else if (pCmprsor->i_nEle > SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { - ridx = pCmprsor->i_selector - 1; - } else { - break; - } - } - - if (pCmprsor->i_nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) pCmprsor->i_selector++; - } - int32_t nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(uint64_t)))) { - return code; - } - - uint64_t *bp = (uint64_t *)(pCmprsor->pBuf + pCmprsor->nBuf); - pCmprsor->nBuf += sizeof(uint64_t); - bp[0] = pCmprsor->i_selector; - uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector]; - for (int32_t iVal = 0; iVal < nEle; iVal++) { - bp[0] |= (pCmprsor->i_aZigzag[pCmprsor->i_start] << (bits * iVal + 4)); - pCmprsor->i_start = NEXT_IDX[pCmprsor->i_start]; - pCmprsor->i_nEle--; - } - - pCmprsor->i_selector = 0; - } - - if (pCmprsor->nBuf >= DATA_TYPE_INFO[pCmprsor->type].bytes * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 0) { - code = tCompIntSwitchToCopy(pCmprsor); - if (code) return code; - } - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { - code = tTwoStageComp(pCmprsor, nData); - if (code) return code; - *ppData = pCmprsor->aBuf[0]; - } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } else { - ASSERT(0); - } - - return code; -} - -// Float ===================================================== -static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->f_prev = 0; - pCmprsor->f_flag_p = NULL; - - pCmprsor->nBuf = 1; - - code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); - if (code) return code; - - pCmprsor->pBuf[0] = 0; - - return code; -} - -static int32_t tCompFloatSwitchToCopy(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor->nVal == 0) goto _exit; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(float) * pCmprsor->nVal + 1))) { - return code; - } - - int32_t n = 1; - int32_t nBuf = 1; - union { - float f; - uint32_t u; - } val = {.u = 0}; - - for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { - uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; - - n++; - - for (int8_t i = 0; i < 2; i++) { - uint8_t flag = flags[i]; - - uint32_t diff = 0; - int8_t nBytes = (flag & 0x7) + 1; - for (int j = 0; j < nBytes; j++) { - diff |= (((uint32_t)pCmprsor->pBuf[n]) << (8 * j)); - n++; - } - - if (flag & 0x8) { - diff <<= (32 - nBytes * 8); - } - - val.u ^= diff; - - memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val)); - nBuf += sizeof(val); - - iVal++; - if (iVal >= pCmprsor->nVal) break; - } - } - uint8_t *pBuf = pCmprsor->pBuf; - pCmprsor->pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pBuf; - pCmprsor->nBuf = nBuf; - -_exit: - pCmprsor->pBuf[0] = 1; - return code; -} -static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - ASSERT(nData == sizeof(float)); - - union { - float f; - uint32_t u; - } val = {.f = *(float *)pData}; - - uint32_t diff = val.u ^ pCmprsor->f_prev; - pCmprsor->f_prev = val.u; - - int32_t clz, ctz; - if (diff) { - clz = BUILDIN_CLZ(diff); - ctz = BUILDIN_CTZ(diff); - } else { - clz = 32; - ctz = 32; - } - - uint8_t nBytes; - if (clz < ctz) { - nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; - if (nBytes) diff >>= (32 - nBytes * BITS_PER_BYTE); - } else { - nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; - } - if (nBytes == 0) nBytes++; - - if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 9))) { - return code; - } - - pCmprsor->f_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf]; - pCmprsor->nBuf++; - - if (clz < ctz) { - pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); - } else { - pCmprsor->f_flag_p[0] = nBytes - 1; - } - } else { - if (clz < ctz) { - pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - } else { - pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); - } - } - for (; nBytes; nBytes--) { - pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff); - pCmprsor->nBuf++; - diff >>= BITS_PER_BYTE; - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - if (pCmprsor->nBuf >= sizeof(float) * pCmprsor->nVal + 1) { - code = tCompFloatSwitchToCopy(pCmprsor); - if (code) return code; - } - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { - code = tTwoStageComp(pCmprsor, nData); - if (code) return code; - *ppData = pCmprsor->aBuf[0]; - } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } else { - ASSERT(0); - } - - return code; -} - -// Double ===================================================== -static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->d_prev = 0; - pCmprsor->d_flag_p = NULL; - - pCmprsor->nBuf = 1; - - code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); - if (code) return code; - - pCmprsor->pBuf[0] = 0; - - return code; -} - -static int32_t tCompDoubleSwitchToCopy(SCompressor *pCmprsor) { - int32_t code = 0; - - if (pCmprsor->nVal == 0) goto _exit; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(double) * pCmprsor->nVal + 1))) { - return code; - } - - int32_t n = 1; - int32_t nBuf = 1; - union { - double f; - uint64_t u; - } val = {.u = 0}; - - for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { - uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; - - n++; - - for (int8_t i = 0; i < 2; i++) { - uint8_t flag = flags[i]; - - uint64_t diff = 0; - int8_t nBytes = (flag & 0x7) + 1; - for (int j = 0; j < nBytes; j++) { - diff |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j)); - n++; - } - - if (flag & 0x8) { - diff <<= (64 - nBytes * 8); - } - - val.u ^= diff; - - memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val)); - nBuf += sizeof(val); - - iVal++; - if (iVal >= pCmprsor->nVal) break; - } - } - uint8_t *pBuf = pCmprsor->pBuf; - pCmprsor->pBuf = pCmprsor->aBuf[0]; - pCmprsor->aBuf[0] = pBuf; - pCmprsor->nBuf = nBuf; - -_exit: - pCmprsor->pBuf[0] = 1; - return code; -} -static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - ASSERT(nData == sizeof(double)); - - union { - double d; - uint64_t u; - } val = {.d = *(double *)pData}; - - uint64_t diff = val.u ^ pCmprsor->d_prev; - pCmprsor->d_prev = val.u; - - int32_t clz, ctz; - if (diff) { - clz = BUILDIN_CLZL(diff); - ctz = BUILDIN_CTZL(diff); - } else { - clz = 64; - ctz = 64; - } - - uint8_t nBytes; - if (clz < ctz) { - nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; - if (nBytes) diff >>= (64 - nBytes * BITS_PER_BYTE); - } else { - nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; - } - if (nBytes == 0) nBytes++; - - if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) { - return code; - } - - pCmprsor->d_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf]; - pCmprsor->nBuf++; - - if (clz < ctz) { - pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); - } else { - pCmprsor->d_flag_p[0] = nBytes - 1; - } - } else { - if (clz < ctz) { - pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); - } else { - pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); - } - } - for (; nBytes; nBytes--) { - pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff); - pCmprsor->nBuf++; - diff >>= BITS_PER_BYTE; - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - if (pCmprsor->nBuf >= sizeof(double) * pCmprsor->nVal + 1) { - code = tCompDoubleSwitchToCopy(pCmprsor); - if (code) return code; - } - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { - code = tTwoStageComp(pCmprsor, nData); - if (code) return code; - *ppData = pCmprsor->aBuf[0]; - } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } else { - ASSERT(0); - } - - return code; -} - -// Binary ===================================================== -static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - pCmprsor->nBuf = 1; - return 0; -} - -static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - if (nData) { - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData))) { - return code; - } - - memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData); - pCmprsor->nBuf += nData; - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - if (pCmprsor->nBuf == 1) return code; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf))) { - return code; - } - - int32_t szComp = - LZ4_compress_default(pCmprsor->pBuf + 1, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf - 1, pCmprsor->nBuf - 1); - if (szComp && szComp < pCmprsor->nBuf - 1) { - pCmprsor->aBuf[0][0] = 1; - *ppData = pCmprsor->aBuf[0]; - *nData = szComp + 1; - } else { - pCmprsor->pBuf[0] = 0; - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } - - return code; -} - -// Bool ===================================================== -static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; - -static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - pCmprsor->nBuf = 0; - return 0; -} - -static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) { - int32_t code = 0; - - bool vBool = *(int8_t *)pData; - - int32_t mod4 = (pCmprsor->nVal & 3); - if (mod4 == 0) { - pCmprsor->nBuf++; - - if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf))) { - return code; - } - - pCmprsor->pBuf[pCmprsor->nBuf - 1] = 0; - } - if (vBool) { - pCmprsor->pBuf[pCmprsor->nBuf - 1] |= BOOL_CMPR_TABLE[mod4]; - } - pCmprsor->nVal++; - - return code; -} - -static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { - int32_t code = 0; - - if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { - code = tTwoStageComp(pCmprsor, nData); - if (code) return code; - *ppData = pCmprsor->aBuf[0]; - } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { - *ppData = pCmprsor->pBuf; - *nData = pCmprsor->nBuf; - } else { - ASSERT(0); - } - - return code; -} - -// SCompressor ===================================================== -int32_t tCompressorCreate(SCompressor **ppCmprsor) { - int32_t code = 0; - - *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); - if ((*ppCmprsor) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - - return code; -} - -int32_t tCompressorDestroy(SCompressor *pCmprsor) { - int32_t code = 0; - - tFree(pCmprsor->pBuf); - - int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); - for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { - tFree(pCmprsor->aBuf[iBuf]); - } - - taosMemoryFree(pCmprsor); - - return code; -} - -int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - - pCmprsor->type = type; - pCmprsor->cmprAlg = cmprAlg; - pCmprsor->autoAlloc = 1; - pCmprsor->nVal = 0; - - if (DATA_TYPE_INFO[type].startFn) { - DATA_TYPE_INFO[type].startFn(pCmprsor, type, cmprAlg); - } - - return code; -} - -int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin) { - int32_t code = 0; - - *ppOut = NULL; - *nOut = 0; - if (nOrigin) { - if (DATA_TYPE_INFO[pCmprsor->type].isVarLen) { - *nOrigin = pCmprsor->nBuf - 1; - } else { - *nOrigin = pCmprsor->nVal * DATA_TYPE_INFO[pCmprsor->type].bytes; - } - } - - if (pCmprsor->nVal == 0) return code; - - if (DATA_TYPE_INFO[pCmprsor->type].endFn) { - return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppOut, nOut); - } - - return code; -} - -int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) { - return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData); -} -#endif /************************************************************************* * REGULAR COMPRESSION *************************************************************************/ @@ -2448,7 +1305,6 @@ int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut, if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; return tsDecompressTimestampImp(pBuf, nEle, pOut); } else { - ASSERTS(0, "compress algo invalid"); return -1; } } @@ -2467,7 +1323,6 @@ int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_ int32_t len = tsCompressFloatImp(pIn, nEle, pBuf); return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2486,7 +1341,6 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; return tsDecompressFloatImp(pBuf, nEle, pOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2506,7 +1360,6 @@ int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 int32_t len = tsCompressDoubleImp(pIn, nEle, pBuf); return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2525,7 +1378,6 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int if (tsDecompressStringImp(pIn, nIn, pBuf, nBuf) < 0) return -1; return tsDecompressDoubleImp(pBuf, nEle, pOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2554,7 +1406,6 @@ int32_t tsCompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_THIRDPARTY_ERROR; } } @@ -2568,7 +1419,6 @@ int32_t tsDecompressBool(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressBoolImp(pBuf, nEle, pOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2585,7 +1435,6 @@ int32_t tsCompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2599,7 +1448,6 @@ int32_t tsDecompressTinyint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_TINYINT); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2616,7 +1464,6 @@ int32_t tsCompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2630,7 +1477,6 @@ int32_t tsDecompressSmallint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, i if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_SMALLINT); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2647,7 +1493,6 @@ int32_t tsCompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2661,7 +1506,6 @@ int32_t tsDecompressInt(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_ if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_INT); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2678,7 +1522,6 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 } return tsCompressStringImp(pBuf, len, pOut, nOut); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2692,7 +1535,6 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int if ((code = tsDecompressStringImp(pIn, nIn, pBuf, nBuf)) < 0) return code; return tsDecompressINTImp(pBuf, nEle, pOut, TSDB_DATA_TYPE_BIGINT); } else { - ASSERTS(0, "compress algo invalid"); return TSDB_CODE_INVALID_PARA; } } @@ -2734,8 +1576,6 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int tDataTypes[type].name); \ return compressL2Dict[l2].decomprFn(pIn, nIn, pOut, nOut, type); \ } \ - } else { \ - ASSERT(0); \ } \ return TSDB_CODE_INVALID_PARA; \ } while (1) diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index b14b0823a3..29d8b38146 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -121,7 +121,9 @@ int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); } static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) { cfgItemFreeVal(pItem); - ASSERT(pItem->str == NULL); + if (!(pItem->str == NULL)) { + return TSDB_CODE_INVALID_PARA; + } pItem->str = taosStrdup(conf); if (pItem->str == NULL) { diff --git a/source/util/src/tdigest.c b/source/util/src/tdigest.c index 51b68f8f6b..636c97dbaa 100644 --- a/source/util/src/tdigest.c +++ b/source/util/src/tdigest.c @@ -143,24 +143,20 @@ int32_t tdigestCompress(TDigest *t) { if (a->mean <= b->mean) { mergeCentroid(&args, a); - ASSERTS(args.idx < t->size, "idx over size"); i++; } else { mergeCentroid(&args, b); - ASSERTS(args.idx < t->size, "idx over size"); j++; } } while (i < num_unmerged) { mergeCentroid(&args, &unmerged_centroids[i++]); - ASSERTS(args.idx < t->size, "idx over size"); } taosMemoryFree((void *)unmerged_centroids); while (j < t->num_centroids) { mergeCentroid(&args, &t->centroids[j++]); - ASSERTS(args.idx < t->size, "idx over size"); } if (t->total_weight > 0) { diff --git a/source/util/src/tencode.c b/source/util/src/tencode.c index aa9f157f3a..99b0b2bded 100644 --- a/source/util/src/tencode.c +++ b/source/util/src/tencode.c @@ -104,7 +104,6 @@ void tEndEncode(SEncoder* pCoder) { if (pCoder->data) { pNode = pCoder->eStack; - ASSERT(pNode); pCoder->eStack = pNode->pNext; len = pCoder->pos; @@ -148,7 +147,6 @@ void tEndDecode(SDecoder* pCoder) { SDecoderNode* pNode; pNode = pCoder->dStack; - ASSERT(pNode); pCoder->dStack = pNode->pNext; pCoder->data = pNode->data; diff --git a/source/util/src/texception.c b/source/util/src/texception.c deleted file mode 100644 index 2c8ddf6a5f..0000000000 --- a/source/util/src/texception.c +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "texception.h" -#include "tlog.h" - -static threadlocal SExceptionNode* expList; - -void exceptionPushNode(SExceptionNode* node) { - node->prev = expList; - expList = node; -} - -int32_t exceptionPopNode() { - SExceptionNode* node = expList; - expList = node->prev; - return node->code; -} - -void exceptionThrow(int32_t code) { - expList->code = code; - longjmp(expList->jb, 1); -} - -static void cleanupWrapper_void_ptr_ptr(SCleanupAction* ca) { - void (*func)(void*, void*) = ca->func; - func(ca->arg1.Ptr, ca->arg2.Ptr); -} - -static void cleanupWrapper_void_ptr_bool(SCleanupAction* ca) { - void (*func)(void*, bool) = ca->func; - func(ca->arg1.Ptr, ca->arg2.Bool); -} - -static void cleanupWrapper_void_ptr(SCleanupAction* ca) { - void (*func)(void*) = ca->func; - func(ca->arg1.Ptr); -} - -static void cleanupWrapper_int_int(SCleanupAction* ca) { - int32_t (*func)(int32_t) = ca->func; - (void)func(ca->arg1.Int); -} - -static void cleanupWrapper_void(SCleanupAction* ca) { - void (*func)() = ca->func; - func(); -} - -static void cleanupWrapper_int_ptr(SCleanupAction* ca) { - int32_t (*func)(void*) = ca->func; - (void)func(ca->arg1.Ptr); -} - -typedef void (*wrapper)(SCleanupAction*); -static wrapper wrappers[] = { - cleanupWrapper_void_ptr_ptr, cleanupWrapper_void_ptr_bool, cleanupWrapper_void_ptr, - cleanupWrapper_int_int, cleanupWrapper_void, cleanupWrapper_int_ptr, -}; - -void cleanupPush_void_ptr_ptr(bool failOnly, void* func, void* arg1, void* arg2) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 0; - ca->failOnly = failOnly; - ca->func = func; - ca->arg1.Ptr = arg1; - ca->arg2.Ptr = arg2; -} - -void cleanupPush_void_ptr_bool(bool failOnly, void* func, void* arg1, bool arg2) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 1; - ca->failOnly = failOnly; - ca->func = func; - ca->arg1.Ptr = arg1; - ca->arg2.Bool = arg2; -} - -void cleanupPush_void_ptr(bool failOnly, void* func, void* arg) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 2; - ca->failOnly = failOnly; - ca->func = func; - ca->arg1.Ptr = arg; -} - -void cleanupPush_int_int(bool failOnly, void* func, int32_t arg) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 3; - ca->failOnly = failOnly; - ca->func = func; - ca->arg1.Int = arg; -} - -void cleanupPush_void(bool failOnly, void* func) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 4; - ca->failOnly = failOnly; - ca->func = func; -} - -void cleanupPush_int_ptr(bool failOnly, void* func, void* arg) { - ASSERTS(expList->numCleanupAction < expList->maxCleanupAction, "numCleanupAction over maxCleanupAction"); - - SCleanupAction* ca = expList->cleanupActions + expList->numCleanupAction++; - ca->wrapper = 5; - ca->failOnly = failOnly; - ca->func = func; - ca->arg1.Ptr = arg; -} - -int32_t cleanupGetActionCount() { return expList->numCleanupAction; } - -static void doExecuteCleanup(SExceptionNode* node, int32_t anchor, bool failed) { - while (node->numCleanupAction > anchor) { - --node->numCleanupAction; - SCleanupAction* ca = node->cleanupActions + node->numCleanupAction; - if (failed || !(ca->failOnly)) { - wrappers[ca->wrapper](ca); - } - } -} - -void cleanupExecuteTo(int32_t anchor, bool failed) { doExecuteCleanup(expList, anchor, failed); } - -void cleanupExecute(SExceptionNode* node, bool failed) { doExecuteCleanup(node, 0, failed); } -bool cleanupExceedLimit() { return expList->numCleanupAction >= expList->maxCleanupAction; } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 5c1909b16e..aac66348e7 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -191,14 +191,12 @@ static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SH (void)atomic_sub_fetch_16(&pNode->refCount, 1); if (prev != NULL) { prev->next = pNewNode; - ASSERT(prev->next != prev); } else { pe->next = pNewNode; } if (pNode->refCount <= 0) { pNewNode->next = pNode->next; - ASSERT(pNewNode->next != pNewNode); FREE_HASH_NODE(pHashObj->freeFp, pNode); } else { @@ -508,7 +506,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { pe->next = pNode->next; } else { prevNode->next = pNode->next; - ASSERT(prevNode->next != prevNode); } pe->num--; @@ -759,12 +756,10 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { if (pOld->refCount <= 0) { if (prevNode) { prevNode->next = pOld->next; - ASSERT(prevNode->next != prevNode); } else { pe->next = pOld->next; SHashNode *x = pe->next; if (x != NULL) { - ASSERT(x->next != x); } } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index a1fbe3db2a..ef0a45233a 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -2,8 +2,8 @@ #include "tpagedbuf.h" #include "taoserror.h" #include "tcompression.h" -#include "tsimplehash.h" #include "tlog.h" +#include "tsimplehash.h" #define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) #define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL) @@ -27,24 +27,24 @@ struct SPageInfo { }; struct SDiskbasedBuf { - int32_t numOfPages; - int64_t totalBufSize; - uint64_t fileSize; // disk file size - TdFilePtr pFile; - int32_t allocateId; // allocated page id - char* path; // file path - char* prefix; // file name prefix - int32_t pageSize; // current used page size - int32_t inMemPages; // numOfPages that are allocated in memory - SList* freePgList; // free page list - SArray* pIdList; // page id list - SSHashObj*all; - SList* lruList; - void* emptyDummyIdList; // dummy id list - void* assistBuf; // assistant buffer for compress/decompress data - SArray* pFree; // free area in file - bool comp; // compressed before flushed to disk - uint64_t nextPos; // next page flush position + int32_t numOfPages; + int64_t totalBufSize; + uint64_t fileSize; // disk file size + TdFilePtr pFile; + int32_t allocateId; // allocated page id + char* path; // file path + char* prefix; // file name prefix + int32_t pageSize; // current used page size + int32_t inMemPages; // numOfPages that are allocated in memory + SList* freePgList; // free page list + SArray* pIdList; // page id list + SSHashObj* all; + SList* lruList; + void* emptyDummyIdList; // dummy id list + void* assistBuf; // assistant buffer for compress/decompress data + SArray* pFree; // free area in file + bool comp; // compressed before flushed to disk + uint64_t nextPos; // next page flush position char* id; // for debug purpose bool printStatis; // Print statistics info when closing this buffer. @@ -95,7 +95,8 @@ static int32_t doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDisk } else if (*dst < 0) { return terrno; } - return code;; + return code; + ; } static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { @@ -300,7 +301,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SPageInfo* pageInfo = *(SPageInfo**)pn->data; SPageInfo* p = *(SPageInfo**)(pageInfo->pData); - ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo); if (!pageInfo->used) { break; @@ -435,14 +435,14 @@ static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) { void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; - bool newPage = false; + bool newPage = false; char* availablePage = doExtractPage(pBuf, &newPage); if (availablePage == NULL) { return NULL; } SPageInfo* pi = NULL; - int32_t code = 0; + int32_t code = 0; if (listNEles(pBuf->freePgList) != 0) { SListNode* pItem = tdListPopHead(pBuf->freePgList); pi = *(SPageInfo**)pItem->data; @@ -538,8 +538,6 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { #endif return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory - ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL && - (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); bool newPage = false; (*pi)->pData = doExtractPage(pBuf, &newPage); @@ -700,7 +698,7 @@ void setBufPageDirty(void* pPage, bool dirty) { void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; - if (comp && (pBuf->assistBuf == NULL)) { + if (comp && (pBuf->assistBuf == NULL)) { pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2); // EXTRA BYTES } } diff --git a/source/util/src/trbtree.c b/source/util/src/trbtree.c index dddc2aea4b..fa0862a155 100644 --- a/source/util/src/trbtree.c +++ b/source/util/src/trbtree.c @@ -263,7 +263,6 @@ static void rbtree_delete_fixup(rbtree_t *rbtree, rbnode_t *child, rbnode_t *chi child_parent->color = BLACK; return; } - ASSERTS(sibling != RBTREE_NULL, "sibling is NULL"); /* get a new sibling, by rotating at sibling. See which child of sibling is red */ @@ -293,11 +292,9 @@ static void rbtree_delete_fixup(rbtree_t *rbtree, rbnode_t *child, rbnode_t *chi sibling->color = child_parent->color; child_parent->color = BLACK; if (child_parent->right == child) { - ASSERTS(sibling->left->color == RED, "slibing->left->color=%d not equal RED", sibling->left->color); sibling->left->color = BLACK; rbtree_rotate_right(rbtree, child_parent); } else { - ASSERTS(sibling->right->color == RED, "slibing->right->color=%d not equal RED", sibling->right->color); sibling->right->color = BLACK; rbtree_rotate_left(rbtree, child_parent); } @@ -320,18 +317,15 @@ static void swap_np(rbnode_t **x, rbnode_t **y) { /** Update parent pointers of child trees of 'parent' */ static void change_parent_ptr(rbtree_t *rbtree, rbnode_t *parent, rbnode_t *old, rbnode_t *new) { if (parent == RBTREE_NULL) { - ASSERTS(rbtree->root == old, "root not equal old"); if (rbtree->root == old) rbtree->root = new; return; } - ASSERT(parent->left == old || parent->right == old || parent->left == new || parent->right == new); if (parent->left == old) parent->left = new; if (parent->right == old) parent->right = new; } /** Update parent pointer of a node 'child' */ static void change_child_ptr(rbtree_t *rbtree, rbnode_t *child, rbnode_t *old, rbnode_t *new) { if (child == RBTREE_NULL) return; - ASSERT(child->parent == old || child->parent == new); if (child->parent == old) child->parent = new; } @@ -376,7 +370,6 @@ rbnode_t *rbtree_delete(rbtree_t *rbtree, void *key) { /* now delete to_delete (which is at the location where the smright previously was) */ } - ASSERT(to_delete->left == RBTREE_NULL || to_delete->right == RBTREE_NULL); if (to_delete->left != RBTREE_NULL) child = to_delete->left; diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c index e39c7364b7..d14e72822f 100644 --- a/source/util/src/tsimplehash.c +++ b/source/util/src/tsimplehash.c @@ -261,8 +261,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) { SHNode *pNode = pHashObj->hashList[index]; while (pNode) { - const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); - ASSERT(keyLen > 0); + const char *p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) { break; diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index e5e52e44d9..ae01292e08 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -366,51 +366,6 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) { return NULL; } -#ifdef BUILD_NO_CALL -void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { - if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { - return; - } - - SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); - - int32_t id = 1; - char *prev = NULL; - - while (p != pSkipList->pTail) { - char *key = SL_GET_NODE_KEY(pSkipList, p); - if (prev != NULL) { - ASSERT(pSkipList->comparFn(prev, key) < 0); - } - - switch (pSkipList->type) { - case TSDB_DATA_TYPE_INT: - fprintf(stdout, "%d: %d\n", id++, *(int32_t *)key); - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_BIGINT: - fprintf(stdout, "%d: %" PRId64 " \n", id++, *(int64_t *)key); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_VARBINARY: - case TSDB_DATA_TYPE_GEOMETRY: - fprintf(stdout, "%d: %s \n", id++, key); - break; - case TSDB_DATA_TYPE_DOUBLE: - fprintf(stdout, "%d: %lf \n", id++, *(double *)key); - break; - default: - fprintf(stdout, "\n"); - } - - prev = SL_GET_NODE_KEY(pSkipList, p); - - p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1); - } -} -#endif - static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) { for (int32_t i = 0; i < pNode->level; ++i) { SSkipListNode *x = direction[i]; @@ -538,33 +493,6 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, return hasDupKey; } -#ifdef BUILD_NO_CALL -static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { - int32_t level = pNode->level; - uint8_t dupMode = SL_DUP_MODE(pSkipList); - ASSERT(dupMode != SL_DISCARD_DUP_KEY && dupMode != SL_UPDATE_DUP_KEY); - - for (int32_t j = level - 1; j >= 0; --j) { - SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(pNode, j); - SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(pNode, j); - - SL_NODE_GET_FORWARD_POINTER(prev, j) = next; - SL_NODE_GET_BACKWARD_POINTER(next, j) = prev; - } - - tSkipListFreeNode(pNode); - pSkipList->size--; -} - -// Function must be called after calling tSkipListRemoveNodeImpl() function -static void tSkipListCorrectLevel(SSkipList *pSkipList) { - while (pSkipList->level > 0 && - SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == pSkipList->pTail) { - pSkipList->level -= 1; - } -} -#endif - UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level #if SKIP_LIST_RECORD_PERFORMANCE diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 94f514e208..081acda20f 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -124,7 +124,6 @@ char **strsplit(char *z, const char *delim, int32_t *num) { if (split == NULL) { return NULL; } - ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t)POINTER_BYTES * size); } } From b2c341c273652eb151348eb216a103cbe0bcfffd Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 22 Aug 2024 15:27:16 +0800 Subject: [PATCH 4/7] fix(wal/assert): return error code instead of assert --- source/libs/wal/src/walMeta.c | 5 ++++- source/libs/wal/src/walWrite.c | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index db1d61a023..d0c7dea451 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -357,7 +357,10 @@ static int32_t walLogEntriesComplete(const SWal* pWal) { static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); - ASSERT(pFileInfo != NULL); + if (!pFileInfo) { + TAOS_RETURN(TSDB_CODE_FAILED); + } + char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 9979ddd0b0..52af3e8528 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -371,8 +371,11 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { int32_t code = 0; + if (logRetention < 0) { + TAOS_RETURN(TSDB_CODE_FAILED); + } + TAOS_UNUSED(taosThreadMutexLock(&pWal->mutex)); - ASSERT(logRetention >= 0); pWal->vers.verInSnapshotting = ver; pWal->vers.logRetention = logRetention; @@ -438,7 +441,10 @@ int32_t walEndSnapshot(SWal *pWal) { if (pInfo) { wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver, pInfo->firstVer, pInfo->lastVer); - ASSERT(ver <= pInfo->lastVer); + if (ver > pInfo->lastVer) { + TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit); + } + if (ver == pInfo->lastVer) { pInfo++; } From 94244081f8a603741232f518cb681522064daf2e Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 21 Aug 2024 13:34:34 +0800 Subject: [PATCH 5/7] TD-31315: uninstall taosx update --- packaging/tools/install.sh | 6 +++--- packaging/tools/makepkg.sh | 4 ++-- packaging/tools/remove.sh | 33 ++++++++++++++++++--------------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 374d17ada6..8a6b159a22 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -229,8 +229,8 @@ function install_bin() { if [ -d ${script_dir}/${xname}/bin ]; then ${csudo}cp -r ${script_dir}/${xname}/bin/* ${install_main_dir}/bin fi - if [ -e ${script_dir}/${xname}/uninstall.sh ]; then - ${csudo}cp -r ${script_dir}/${xname}/uninstall.sh ${install_main_dir}/uninstall_${xname}.sh + if [ -e ${script_dir}/${xname}/uninstall_${xname}.sh ]; then + ${csudo}cp -r ${script_dir}/${xname}/uninstall_${xname}.sh ${install_main_dir}/uninstall_${xname}.sh fi fi @@ -254,7 +254,7 @@ function install_bin() { [ -x ${install_main_dir}/bin/${service} ] && ${csudo}ln -sf ${install_main_dir}/bin/${service} ${bin_link_dir}/${service} || : done - [ ${install_main_dir}/uninstall_${xname}.sh ] && ${csudo}ln -sf ${install_main_dir}/uninstall_${xname}.sh ${bin_link_dir}/uninstall_${xname}.sh || : + [ -x ${install_main_dir}/uninstall_${xname}.sh ] && ${csudo}ln -sf ${install_main_dir}/uninstall_${xname}.sh ${bin_link_dir}/uninstall_${xname}.sh || : } function install_lib() { diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 9e1cc73238..b614170fd8 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -363,8 +363,8 @@ if [ "$verMode" == "cluster" ]; then # copy taosx if [ -d ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ]; then cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir} - cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx - sed -i 's/target=\"\"/target=\"taosx\"/g' ${install_dir}/taosx/uninstall.sh + cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx/uninstall_taosx.sh + sed -i "s/uninstall.sh/uninstall_taosx.sh/g" ${install_dir}/taosx/uninstall_taosx.sh fi fi fi diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 88dcbf41f3..58a17e2a50 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -226,29 +226,27 @@ function remove_data_and_config() { [ -d "${log_dir}" ] && ${csudo}rm -rf ${log_dir} } -function remove_taosx() { - if [ -e /usr/local/taos/taosx/uninstall.sh ]; then - bash /usr/local/taos/taosx/uninstall.sh - fi -} - echo echo "Do you want to remove all the data, log and configuration files? [y/n]" read answer +remove_flag=false if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then confirmMsg="I confirm that I would like to delete all data, log and configuration files" echo "Please enter '${confirmMsg}' to continue" read answer - if [ X"$answer" == X"${confirmMsg}" ]; then - remove_data_and_config - if [ -e /usr/bin/uninstall_${PREFIX}x.sh ]; then - bash /usr/bin/uninstall_${PREFIX}x.sh --clean-all true - fi + if [ X"$answer" == X"${confirmMsg}" ]; then + remove_flag=true else - echo "answer doesn't match, skip this step" - if [ -e /usr/bin/uninstall_${PREFIX}x.sh ]; then - bash /usr/bin/uninstall_${PREFIX}x.sh --clean-all false - fi + echo "answer doesn't match, skip this step" + fi +fi +echo + +if [ -e ${install_main_dir}/uninstall_${PREFIX}x.sh ]; then + if [ X$remove_flag == X"true" ]; then + bash ${install_main_dir}/uninstall_${PREFIX}x.sh --clean-all true + else + bash ${install_main_dir}/uninstall_${PREFIX}x.sh --clean-all false fi fi @@ -262,6 +260,11 @@ clean_log clean_config # Remove data link directory ${csudo}rm -rf ${data_link_dir} || : + +if [ X$remove_flag == X"true" ]; then + remove_data_and_config +fi + ${csudo}rm -rf ${install_main_dir} || : if [[ -e /etc/os-release ]]; then osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release) From 0ed15b5ceca95ef584bca5c12269aa8ac588e3b9 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Thu, 22 Aug 2024 18:17:12 +0800 Subject: [PATCH 6/7] Update 12-multi.md --- docs/zh/07-operation/12-multi.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/07-operation/12-multi.md b/docs/zh/07-operation/12-multi.md index 9b91a47e39..e156af55f8 100644 --- a/docs/zh/07-operation/12-multi.md +++ b/docs/zh/07-operation/12-multi.md @@ -16,7 +16,7 @@ toc_max_heading_level: 4 ### 配置方式 -多级存储支持 3 级,每级最多可配置 16 个挂载点。 +多级存储支持 3 级,每级最多可配置 128 个挂载点。 **Tips** 典型的配置方案有:0 级配置多个挂载点,每个挂载点对应单块 SAS 硬盘;1 级配置多个挂载点,每个挂载点对应单块或多块 SATA 硬盘;2 级可配置 S3 存储或其他廉价网络存储。 @@ -110,4 +110,4 @@ s3migrate database ; | :--- | :----------- | :----- | :----- | :------ | :----------------------------------------------------------- | | 1 | s3_keeplocal | 3650 | 1 | 365000 | 数据在本地保留的天数,即 data 文件在本地磁盘保留多长时间后可以上传到 S3。默认单位:天,支持 m(分钟)、h(小时)和 d(天)三个单位 | | 2 | s3_chunksize | 262144 | 131072 | 1048576 | 上传对象的大小阈值,与 TSDB_PAGESIZE 参数一样,不可修改,单位为 TSDB 页 | -| 3 | s3_compact | 0 | 0 | 1 | TSDB 文件组首次上传 S3 时,是否自动进行 compact 操作。 | \ No newline at end of file +| 3 | s3_compact | 0 | 0 | 1 | TSDB 文件组首次上传 S3 时,是否自动进行 compact 操作。 | From 77f98e01c13d1f7de67808b6bfb0edb8ec0a6477 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Thu, 22 Aug 2024 18:17:46 +0800 Subject: [PATCH 7/7] Update 01-taosd.md --- docs/zh/14-reference/01-components/01-taosd.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 3746a16c54..ba17126a92 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -53,7 +53,7 @@ taosd 命令行参数如下 | :--------------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | | queryPolicy | 查询策略,1: 只使用 vnode,不使用 qnode; 2: 没有扫描算子的子任务在 qnode 执行,带扫描算子的子任务在 vnode 执行; 3: vnode 只运行扫描算子,其余算子均在 qnode 执行 ;缺省值:1 | | maxNumOfDistinctRes | 允许返回的 distinct 结果最大行数,默认值 10 万,最大允许值 1 亿 | -| countAlwaysReturnValue | ount/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值,0: 返回空行,1: 返回;该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. | +| countAlwaysReturnValue | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值,0: 返回空行,1: 返回;该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. | ### 区域相关