From b4bd6a4f1c3d72564f6a1dd308858d95a02506a3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 10 May 2023 10:49:10 +0000 Subject: [PATCH 01/34] refactor code --- source/libs/stream/src/streamBackendRocksdb.c | 123 ++++++++---------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index db4ec17b19..70aeed3cdd 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -649,17 +649,9 @@ const char* compactFilteFactoryName(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { - // int64_t unixTime = taosGetTimestampMs(); if (streamStateValueIsStale((char*)val)) { return 1; } - // SStreamValue value; - // memset(&value, 0, sizeof(value)); - // streamValueDecode(&value, (char*)val); - // taosMemoryFree(value.data); - // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) { - // return 1; - // } return 0; } const char* compactFilteName(void* arg) { return "stream_filte"; } @@ -703,7 +695,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { memcpy(cfNames[0], "default", strlen("default")); continue; } - qError("cf name %s", idstr); + qDebug("cf name %s", idstr); GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); if (i % cfLen == 0) { @@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); } } - for (int i = 0; i < nSize * cfLen + 1; i++) { - qError("cf name %s", cfNames[i]); - } rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); for (int i = 0; i < nSize * cfLen + 1; i++) { @@ -1012,53 +1001,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - char * p = NULL, *end = NULL; \ - int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ - if (len < 0) { \ - qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ - code = -1; \ - } else { \ - qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ - } \ - if (pVal != NULL) { \ - *pVal = p; \ - } else { \ - taosMemoryFree(p); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = len; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + if (err == NULL) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ + funcname); \ + } else { \ + qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (len < 0) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ + len); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = len; \ + } \ + if (code == 0) \ + qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -1133,10 +1120,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // eLen); if (err != NULL) { - qWarn( - "failed to delete range cf(state) err: %s, " - "start: %s, end:%s", - err, toStringStart, toStringEnd); + qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); } @@ -1588,20 +1572,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { return -1; } - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + size_t klen, vlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen); winKeyDecode(&winKey, keyStr); - size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + // char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); if (len < 0) { return -1; } - - if (pVal != NULL) *pVal = (char*)dst; - if (pVLen != NULL) *pVLen = vlen; + if (pVLen != NULL) *pVLen = len; *pKey = winKey; return 0; From c7602a4c1a11b45c8f8f0124900c1e83a3652f4e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 01:19:40 +0000 Subject: [PATCH 02/34] refactor code --- source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/test/CMakeLists.txt | 2 +- source/libs/stream/test/tstreamUpdateTest.cpp | 30 +++++++++++++++---- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 70aeed3cdd..e1c030b5c5 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -695,7 +695,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { memcpy(cfNames[0], "default", strlen("default")); continue; } - qDebug("cf name %s", idstr); GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); if (i % cfLen == 0) { diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 5a97ba45f6..a0c1717690 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") TARGET_LINK_LIBRARIES( streamUpdateTest - PUBLIC os util common gtest stream + PUBLIC os util common gtest gtest_main stream ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index c698187874..d34e70b734 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -6,6 +6,25 @@ using namespace std; #define MAX_NUM_SCALABLE_BF 100000 +class StreamStateEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // initLog(); + // taosRemoveDir(path); + // SIndexOpts opts; + // opts.cacheSize = 1024 * 1024 * 4; + // int ret = indexOpen(&opts, path, &index); + // assert(ret == 0); + } + virtual void TearDown() { + // indexClose(index); + } + + const char *path = TD_TMP_DIR_PATH "stream"; + // SIndexOpts* opts; + // SIndex* index; +}; + bool equalSBF(SScalableBf *left, SScalableBf *right) { if (left->growth != right->growth) return false; if (left->numBits != right->numBits) return false; @@ -191,8 +210,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) { // updateInfoDestroy(pSU6); // updateInfoDestroy(pSU7); } - -int main(int argc, char *argv[]) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file +// TEST() +TEST_F(StreamStateEnv, test1) {} +// int main(int argc, char *argv[]) { +// testing::InitGoogleTest(&argc, argv); +// return RUN_ALL_TESTS(); +// } \ No newline at end of file From d473fcf6b0570692819f8f4594f7fe682e9777ba Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 01:19:54 +0000 Subject: [PATCH 03/34] refactor code --- source/libs/stream/test/tstreamUpdateTest.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index d34e70b734..4b3f76276c 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -21,6 +21,7 @@ class StreamStateEnv : public ::testing::Test { } const char *path = TD_TMP_DIR_PATH "stream"; + // SIndexOpts* opts; // SIndex* index; }; From f652f71d683dbe0cf39c9a5f8c99427f37b1afb3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 06:37:57 +0000 Subject: [PATCH 04/34] factor code --- include/libs/stream/tstream.h | 4 +++- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 3 +++ source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/src/streamMeta.c | 14 +++++++++----- source/libs/stream/test/tstreamUpdateTest.cpp | 17 +++++++---------- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 95b2f94f3f..8070067964 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -341,7 +341,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SArray* pTaskList; // SArray + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -569,6 +569,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); +void streamMetaInit(); +void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index d884120147..544512233e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -18,6 +18,7 @@ #include "dmNodes.h" #include "index.h" #include "qworker.h" +#include "tstream.h" static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) { } indexInit(tsNumOfCommitThreads); + streamMetaInit(); dmReportStartup("dnode-transport", "initialized"); dDebug("dnode is created, ptr:%p", pDnode); @@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupServer(pDnode); dmClearVars(pDnode); rpcCleanup(); + streamMetaCleanup(); indexCleanup(); taosConvDestroy(); dDebug("dnode is closed, ptr:%p", pDnode); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e1c030b5c5..eaf3175bfa 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -846,7 +846,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); - // return -1; } } pState->pTdbState->rocksdb = handle->db; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 49710b0934..8167a12f6d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -19,6 +19,13 @@ #include "tref.h" #include "ttimer.h" +static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; +static int32_t streamBackendId = 0; +static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } + +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaCleanup() { taosCloseRef(streamBackendId); } + SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -85,8 +92,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(statePath); - pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); - pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend); + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosMemoryFree(statePath); @@ -129,9 +135,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); - taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); - // streamBackendCleanup(pMeta->streamBackend); - taosCloseRef(pMeta->streamBackendId); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 4b3f76276c..18c60aff28 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -1,5 +1,7 @@ #include +#include "streamBackendRocksdb.h" +#include "tstream.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -9,21 +11,16 @@ using namespace std; class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { - // initLog(); - // taosRemoveDir(path); - // SIndexOpts opts; - // opts.cacheSize = 1024 * 1024 * 4; - // int ret = indexOpen(&opts, path, &index); - // assert(ret == 0); + streamMetaInit(); + backend = streamBackendInit(path); } virtual void TearDown() { + streamMetaCleanup(); // indexClose(index); } const char *path = TD_TMP_DIR_PATH "stream"; - - // SIndexOpts* opts; - // SIndex* index; + void *backend; }; bool equalSBF(SScalableBf *left, SScalableBf *right) { @@ -212,7 +209,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { // updateInfoDestroy(pSU7); } // TEST() -TEST_F(StreamStateEnv, test1) {} +TEST(StreamStateEnv, test1) {} // int main(int argc, char *argv[]) { // testing::InitGoogleTest(&argc, argv); // return RUN_ALL_TESTS(); From e6474caddf82a2bd3adf8b594005f2f2760b273b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 08:51:41 +0000 Subject: [PATCH 05/34] factor code --- source/libs/stream/src/streamBackendRocksdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index eaf3175bfa..46df662836 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -15,6 +15,7 @@ // #include "streamStateRocksdb.h" #include "streamBackendRocksdb.h" +#include "executor.h" #include "tcommon.h" typedef struct SCompactFilteFactory { From 0710fc39afe1fa8c691814b5ec2da139cc8091e3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:04:14 +0000 Subject: [PATCH 06/34] factor code --- source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/src/streamMeta.c | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 46df662836..93870006b3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -// #include "streamStateRocksdb.h" #include "streamBackendRocksdb.h" #include "executor.h" #include "tcommon.h" diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8167a12f6d..60d505b305 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -81,6 +81,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + pMeta->streamBackendId = streamBackendId; char* statePath = taosMemoryCalloc(1, len); sprintf(statePath, "%s/%s", pMeta->path, "state"); From 04c7c99a67871d297088e053f799855db30a7bc3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:17:05 +0000 Subject: [PATCH 07/34] factor code --- source/libs/stream/inc/streamBackendRocksdb.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 5d2970a4b7..8ada79d971 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -16,8 +16,6 @@ #ifndef _STREAM_BACKEDN_ROCKSDB_H_ #define _STREAM_BACKEDN_ROCKSDB_H_ -#include "executor.h" - #include "rocksdb/c.h" // #include "streamInc.h" #include "streamState.h" From 59ff6bd3015584bef1b61426d2f7707b39ae2206 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:32:44 +0000 Subject: [PATCH 08/34] factor code --- source/libs/stream/src/streamBackendRocksdb.c | 1 + source/libs/stream/src/tstreamFileState.c | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 93870006b3..d8524dc5d9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -15,6 +15,7 @@ #include "streamBackendRocksdb.h" #include "executor.h" +#include "query.h" #include "tcommon.h" typedef struct SCompactFilteFactory { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b7401ec5d9..ac2b869af3 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -15,6 +15,7 @@ #include "tstreamFileState.h" +#include "query.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" @@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { clearExpiredRowBuff(pFileState, 0, true); } -bool needClearDiskBuff(SStreamFileState* pFileState) { - return pFileState->flushMark > 0; -} +bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; From 4715aacd41f20a63b5b7007520b144bc2cf9de4e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:44:40 +0000 Subject: [PATCH 09/34] factor code --- include/libs/stream/tstream.h | 1 - source/libs/stream/inc/streamInc.h | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8070067964..10d67e89b3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "executor.h" #include "os.h" #include "query.h" #include "streamState.h" diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 71fbe5e086..b17afeec98 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -16,7 +16,7 @@ #ifndef _STREAM_INC_H_ #define _STREAM_INC_H_ -//#include "executor.h" +#include "executor.h" #include "tstream.h" #ifdef __cplusplus From afaee9c3868dde52e6d772ac3b730e5dd3e235e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:51:01 +0000 Subject: [PATCH 10/34] factor code --- include/libs/stream/tstream.h | 1 - source/libs/stream/inc/streamInc.h | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 10d67e89b3..8ab70f3866 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -14,7 +14,6 @@ */ #include "os.h" -#include "query.h" #include "streamState.h" #include "tdatablock.h" #include "tdbInt.h" diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index b17afeec98..90841a329c 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -17,6 +17,7 @@ #define _STREAM_INC_H_ #include "executor.h" +#include "query.h" #include "tstream.h" #ifdef __cplusplus From 4e8f5f5d32cb0a10a1b7f4dd70cadedd4948fdcd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 09:58:53 +0000 Subject: [PATCH 11/34] factor code --- include/libs/stream/tstream.h | 1 - source/libs/stream/inc/streamInc.h | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8ab70f3866..c5a7847803 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -20,7 +20,6 @@ #include "tmsg.h" #include "tmsgcb.h" #include "tqueue.h" -#include "trpc.h" #ifdef __cplusplus extern "C" { diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 90841a329c..c471bc2bd8 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -20,6 +20,8 @@ #include "query.h" #include "tstream.h" +#include "trpc.h" + #ifdef __cplusplus extern "C" { #endif From b91981af5951896da07afd59b2204b546bf56cec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 10:09:09 +0000 Subject: [PATCH 12/34] factor code --- include/libs/stream/streamState.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 63e9e3799a..5b125b42d4 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -20,13 +20,13 @@ #include "tsimplehash.h" #include "tstreamFileState.h" +#ifndef _STREAM_STATE_H_ +#define _STREAM_STATE_H_ + #ifdef __cplusplus extern "C" { #endif -#ifndef _STREAM_STATE_H_ -#define _STREAM_STATE_H_ - // void* streamBackendInit(const char* path); // void streamBackendCleanup(void* arg); // SListNode* streamBackendAddCompare(void* backend, void* arg); From 6cf1adf1b1d417f55ccfe2afba22d25c8dc1dc50 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 13:01:45 +0000 Subject: [PATCH 13/34] factor code --- source/libs/stream/src/streamMeta.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 60d505b305..a8af310bad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -39,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = taosStrdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { - taosMemoryFree(streamPath); goto _err; } + memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; @@ -83,24 +81,24 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->streamBackendId = streamBackendId; - char* statePath = taosMemoryCalloc(1, len); - sprintf(statePath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(statePath, 0755); + memset(streamPath, 0, len); + sprintf(streamPath, "%s/%s", pMeta->path, "state"); + code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - pMeta->streamBackend = streamBackendInit(statePath); + pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - taosMemoryFree(statePath); + taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); return pMeta; _err: + taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -266,13 +264,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; - - // taosWLockLatch(&pMeta->lock); - taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - // atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); int32_t num = taosArrayGetSize(pMeta->pTaskList); From eb3ab4fcea87477deb3dfe1b5eb4a387e20479ee Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 13:34:17 +0000 Subject: [PATCH 14/34] factor code --- source/libs/stream/inc/streamBackendRocksdb.h | 10 +--------- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- source/libs/stream/src/streamState.c | 8 ++++---- source/libs/stream/src/tstreamFileState.c | 6 +++--- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 8ada79d971..0f39cf817b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -110,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); -void* streamStateCreateBatch(); -int32_t streamStateGetBatchSize(void* pBatch); -void streamStateClearBatch(void* pBatch); -void streamStateDestroyBatch(void* pBatch); -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); - // default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); @@ -136,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch); void streamStateClearBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch); int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); + void* val, int32_t vlen, int64_t ttl); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d8524dc5d9..e34fc69de0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1979,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) { void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen) { + void* val, int32_t vlen, int64_t ttl) { int i = streamGetInit(cfName); if (i < 0) { @@ -1990,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr int32_t klen = ginitDict[i].enFunc((void*)key, buf); char* ttlV = NULL; - int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fee2f2ce58..db530aa5a2 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -119,7 +119,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->taskId = pTask->id.taskId; pState->streamId = pTask->id.streamId; #ifdef USE_ROCKSDB - qWarn("open stream state1"); + // qWarn("open stream state1"); taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); if (code == -1) { @@ -127,7 +127,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int taosMemoryFree(pState); pState = NULL; } - qWarn("open stream state2, %s", statePath); + // qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); @@ -225,6 +225,7 @@ void streamStateClose(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); + taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -236,7 +237,6 @@ void streamStateClose(SStreamState* pState, bool remove) { tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); #endif - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); } int32_t streamStateBegin(SStreamState* pState) { @@ -404,7 +404,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo int32_t code = 0; void* batch = streamStateCreateBatch(); - code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); + code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0); if (code != 0) { return code; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac2b869af3..133b72bbe7 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); + code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } if (streamStateGetBatchSize(batch) > 0) { @@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); taosMemoryFree(valBuf); } { @@ -380,7 +380,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; memcpy(keyBuf, taskKey, strlen(taskKey)); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); } streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } From cfc6bd9f9ff7f1f8f5faa342fa2b4d0e22bfeaf5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 12 May 2023 02:10:40 +0000 Subject: [PATCH 15/34] factor code --- source/libs/stream/src/streamBackendRocksdb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e34fc69de0..1cb2311944 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -111,6 +111,9 @@ void* streamBackendInit(const char* path) { taosMemoryFreeClear(err); } } else { + /* + list all cf and get prefix + */ int64_t streamId; int32_t taskId, dummpy = 0; SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -650,10 +653,7 @@ const char* compactFilteFactoryName(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { - if (streamStateValueIsStale((char*)val)) { - return 1; - } - return 0; + return streamStateValueIsStale((char*)val) ? 1 : 0 } const char* compactFilteName(void* arg) { return "stream_filte"; } From 2b9f9add3a9337126a87f529b3c63579a6179b62 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 13 May 2023 20:10:02 +0800 Subject: [PATCH 16/34] feat: create dnode limited by grant of cpu cores --- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 002407ce8a..d4cbcaaacd 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { SDnodeObj *pDnode = NULL; SCreateDnodeReq createReq = {0}; - if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) { + if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) { code = terrno; goto _OVER; } From 41ebd07472595c36abc9bd932dda2d42e004ae06 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 13 May 2023 23:59:57 +0000 Subject: [PATCH 17/34] factor code --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1cb2311944..16ba81c74a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -653,7 +653,7 @@ const char* compactFilteFactoryName(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { - return streamStateValueIsStale((char*)val) ? 1 : 0 + return streamStateValueIsStale((char*)val) ? 1 : 0; } const char* compactFilteName(void* arg) { return "stream_filte"; } From 7fff41c9c98943cf64cf43cfe78fdf404fd3bab0 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Mon, 15 May 2023 10:58:33 +0800 Subject: [PATCH 18/34] enh(taosAdapter): update taosAdapter --- cmake/taosadapter_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 4a8f4864b3..c67918351d 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG ae8d51c + GIT_TAG 565ca21 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From 42562d388fa04d5737ae56b4907aa9c80380966a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 05:29:56 +0000 Subject: [PATCH 19/34] refactor code --- source/libs/stream/src/streamExec.c | 16 ++++++++-------- source/libs/stream/src/tstreamFileState.c | 4 +++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0fb78fb589..9dd59ec112 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,12 +20,12 @@ #define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } bool streamTaskShouldPause(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__PAUSE); } @@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, - pSubmit->submit.msgLen, pSubmit->submit.ver); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; @@ -202,7 +202,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->blocks = pRes; code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosFreeQitem(pRes); + taosFreeQitem(qRes); return code; } @@ -332,12 +332,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated + if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; + pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 133b72bbe7..ac1cce5ba3 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -439,7 +439,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + deleteExpiredCheckPoint(pFileState, mark); void* pStVal = NULL; int32_t len = 0; From 4015f544dbf78bc66d08f7c69b3a5ae90a43efa8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 05:45:17 +0000 Subject: [PATCH 20/34] refactor code --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9dd59ec112..5f9aec4db0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -202,6 +202,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->blocks = pRes; code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); return code; } From a001e3c424ea2867c30bce074e88c3c5dd08f909 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 07:22:29 +0000 Subject: [PATCH 21/34] refactor code --- source/libs/stream/src/tstreamFileState.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac1cce5ba3..67835e77b8 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -324,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; } SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { - clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + clearExpiredRowBuff(pFileState, mark, false); return pFileState->usedBuffs; } From 5583f69109307b3036244085c841a46a4f943fa1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 08:48:49 +0000 Subject: [PATCH 22/34] refactor code --- cmake/cmake.define | 12 +++++++++--- contrib/CMakeLists.txt | 1 - 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 1500858d9f..bb0bd6500b 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory @@ -120,8 +120,14 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0") MESSAGE(STATUS "Compile with Address Sanitizer!") ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + MESSAGE(STATUS "XXXXXXXXXXXXXX Clang/AppleClang" ${TD_DARWIN}) + IF (${TD_DARWIN}) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") + ELSE () + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + ENDIF () ENDIF () # disable all assert diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 536d4eae8e..70389e0732 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -262,7 +262,6 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_TESTS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_TOOLS "" OFF) - option(WITH_LIBURING "" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) add_subdirectory(rocksdb EXCLUDE_FROM_ALL) From b0c1e6f59ba68c28f0d5ecd5de5bf1748866cd2a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 08:57:35 +0000 Subject: [PATCH 23/34] refactor code --- cmake/cmake.define | 12 +++--------- contrib/CMakeLists.txt | 1 + 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index bb0bd6500b..1500858d9f 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE OFF) +set(CMAKE_VERBOSE_MAKEFILE ON) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory @@ -120,14 +120,8 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0") MESSAGE(STATUS "Compile with Address Sanitizer!") ELSE () - MESSAGE(STATUS "XXXXXXXXXXXXXX Clang/AppleClang" ${TD_DARWIN}) - IF (${TD_DARWIN}) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") - ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - ENDIF () + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 70389e0732..536d4eae8e 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -262,6 +262,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_TESTS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_TOOLS "" OFF) + option(WITH_LIBURING "" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) add_subdirectory(rocksdb EXCLUDE_FROM_ALL) From 23c1aa2db094765b2118d9cdf43e7f9446868666 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 15 May 2023 17:55:37 +0800 Subject: [PATCH 24/34] fix: fix multithread issue in destroyTimesliceOperator --- source/libs/executor/src/timesliceoperator.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index d56595dae9..c59669fc53 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) { } taosArrayDestroy(pInfo->pLinearInfo); - taosMemoryFree(pInfo->pPrevGroupKey->pData); - taosMemoryFree(pInfo->pPrevGroupKey); + if (pInfo->pPrevGroupKey) { + taosMemoryFree(pInfo->pPrevGroupKey->pData); + taosMemoryFree(pInfo->pPrevGroupKey); + } cleanupExprSupp(&pInfo->scalarSup); From 941e00541750b4ab286e4ee30538ec5e455061c3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 15 May 2023 09:56:02 +0000 Subject: [PATCH 25/34] refactor code --- cmake/cmake.define | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 1500858d9f..f55a9bdabc 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory From de0cc463e160b0a25d7a3d01093fc1465bdce785 Mon Sep 17 00:00:00 2001 From: meeki007 <5952964+meeki007@users.noreply.github.com> Date: Mon, 15 May 2023 19:21:24 -0400 Subject: [PATCH 26/34] =?UTF-8?q?Correction/=E7=BA=A0=E6=AD=A3=2001-data-t?= =?UTF-8?q?ype.md=20(#21313)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # 10 Change/改变 INT UNSIGNED ---> SMALLINT UNSIGNED --- docs/en/12-taos-sql/01-data-type.md | 36 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/en/12-taos-sql/01-data-type.md b/docs/en/12-taos-sql/01-data-type.md index 204713f971..cca256139d 100644 --- a/docs/en/12-taos-sql/01-data-type.md +++ b/docs/en/12-taos-sql/01-data-type.md @@ -24,24 +24,24 @@ CREATE DATABASE db_name PRECISION 'ns'; In TDengine, the data types below can be used when specifying a column or tag. -| # | **type** | **Bytes** | **Description** | -| --- | :--------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. | -| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. | -| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. | -| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. | -| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. | -| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. | -| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. | -| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. | -| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. | -| 10 | INT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. | -| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. | -| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. | -| 13 | BOOL | 1 | Bool, the value range is {true, false}. | -| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. | -| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. | -| 16 | VARCHAR | User-defined | Alias of BINARY | +| # | **type** | **Bytes** | **Description** | +| --- | :---------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. | +| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. | +| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. | +| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. | +| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. | +| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. | +| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. | +| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. | +| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. | +| 10 | SMALLINT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. | +| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. | +| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. | +| 13 | BOOL | 1 | Bool, the value range is {true, false}. | +| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. | +| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. | +| 16 | VARCHAR | User-defined | Alias of BINARY | :::note From 33c48fe981a2c6bfd326aa24005ce9576849d0ee Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 16 May 2023 08:57:57 +0800 Subject: [PATCH 27/34] fix: client hb logic fix and optimization --- source/client/src/clientHb.c | 89 +++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 37 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 10c42bb67d..dad6627c87 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -24,6 +24,8 @@ typedef struct { struct { int64_t clusterId; int32_t passKeyCnt; + int32_t passVer; + int32_t reqCnt; }; }; } SHbParam; @@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } -static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { +static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (!pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); return TSDB_CODE_APP_ERROR; } - int32_t code = 0; + int32_t code = 0; + + if ((param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { + tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); + goto _return; + } + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -570,6 +578,9 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { goto _return; } + // assign the passVer + param->passVer = pTscObj->passInfo.ver; + _return: releaseTscObj(connKey->tscRid); if (code) { @@ -714,13 +725,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { } int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { - SHbParam *hbParam = (SHbParam *)param; - struct SCatalog *pCatalog = NULL; + int32_t code = 0; + SHbParam *hbParam = (SHbParam *)param; + SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); - return code; + if (hbParam->reqCnt == 0) { + code = catalogGetHandle(hbParam->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } } hbGetAppInfo(hbParam->clusterId, req); @@ -728,23 +742,27 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); if (hbParam->passKeyCnt > 0) { - hbGetUserBasicInfo(connKey, req); + hbGetUserBasicInfo(connKey, hbParam, req); } - code = hbGetExpiredUserInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; + if (hbParam->reqCnt == 0) { + code = hbGetExpiredUserInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = hbGetExpiredStbInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } - code = hbGetExpiredDBInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - code = hbGetExpiredStbInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + ++hbParam->reqCnt; // success to get catalog info return TSDB_CODE_SUCCESS; } @@ -766,6 +784,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { } int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + if (!pBatchReq->reqs) { + tFreeClientHbBatchReq(pBatchReq); + return NULL; + } int64_t rid = -1; int32_t code = 0; @@ -782,12 +804,18 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return NULL; } + SHbParam param = {0}; + while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - SHbParam param; + switch (pOneReq->connKey.connType) { case CONN_TYPE__QUERY: { - param.clusterId = pOneReq->clusterId; + if (param.clusterId == 0) { + // init + param.clusterId = pOneReq->clusterId; + param.passVer = INT32_MIN; + } param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); break; } @@ -801,9 +829,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pOneReq->connKey.tscRid, pOneReq->connKey.connType); } } - break; -#if 0 if (code) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; @@ -812,7 +838,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; -#endif } releaseTscObj(rid); @@ -885,7 +910,6 @@ static void *hbThreadFunc(void *param) { hbGatherAppInfo(); } - SArray *mgr = taosArrayInit(sz, sizeof(void *)); for (int i = 0; i < sz; i++) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) { @@ -894,7 +918,6 @@ static void *hbThreadFunc(void *param) { int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { - taosArrayPush(mgr, &pAppHbMgr); continue; } SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); @@ -908,7 +931,6 @@ static void *hbThreadFunc(void *param) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); - taosArrayPush(mgr, &pAppHbMgr); break; } @@ -920,7 +942,6 @@ static void *hbThreadFunc(void *param) { tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); taosMemoryFree(buf); - taosArrayPush(mgr, &pAppHbMgr); break; } pInfo->fp = hbAsyncCallBack; @@ -941,12 +962,8 @@ static void *hbThreadFunc(void *param) { // hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); - taosArrayPush(mgr, &pAppHbMgr); } - taosArrayDestroy(clientHbMgr.appHbMgrs); - clientHbMgr.appHbMgrs = mgr; - taosThreadMutexUnlock(&clientHbMgr.lock); taosMsleep(HEARTBEAT_INTERVAL); @@ -1179,6 +1196,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner -void taos_set_hb_quit(int8_t quitByKill) { - clientHbMgr.quitByKill = quitByKill; -} +void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; } From 573a86ed99669f04d1a87d52e3a573a4eefd896a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 09:51:00 +0800 Subject: [PATCH 28/34] feature(tmq): add new API to extract offset from result set. --- include/client/taos.h | 1 + source/client/src/clientTmq.c | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/include/client/taos.h b/include/client/taos.h index d9fd1ca1b8..8811c4ab64 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); /* ------------------------------ TAOSX -----------------------------------*/ // note: following apis are unstable diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87aee4a8a3..63e8b3097c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } } +int64_t tmq_get_vgroup_offset(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*) res; + STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset; + if (pOffset->type == TMQ_OFFSET__LOG) { + return pRspObj->rsp.rspOffset.version; + } + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res; + if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) { + return pRspObj->metaRsp.rspOffset.version; + } + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res; + if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) { + return pRspObj->rsp.rspOffset.version; + } + } + + // data from tsdb, no valid offset info + return -1; +} + const char* tmq_get_table_name(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; From b125d214f2e80745db4724f870bff39a0a5b146c Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 16 May 2023 09:58:53 +0800 Subject: [PATCH 29/34] chore: code optimization --- source/client/src/clientHb.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index dad6627c87..0c60592fc5 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -547,7 +547,7 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien int32_t code = 0; - if ((param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { + if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); goto _return; } @@ -579,7 +579,9 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien } // assign the passVer - param->passVer = pTscObj->passInfo.ver; + if (param) { + param->passVer = pTscObj->passInfo.ver; + } _return: releaseTscObj(connKey->tscRid); @@ -1196,4 +1198,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner -void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; } +void taos_set_hb_quit(int8_t quitByKill) { + clientHbMgr.quitByKill = quitByKill; +} From dc67223deb2144a51929f553dee42b1c2dc6c43e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 16 May 2023 10:47:49 +0800 Subject: [PATCH 30/34] fix: file fd not clear issue --- source/client/src/clientEnv.c | 2 ++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 2 ++ 2 files changed, 4 insertions(+) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c8f3feb2d4..cae5c8715d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) { tscError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); + pFile = NULL; continue; } } else { @@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) { if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); + pFile = NULL; truncateFile = false; } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 06b6221940..89c394fdd0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) { dError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); + pFile = NULL; continue; } } else { @@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) { if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); + pFile = NULL; truncateFile = false; } From 3c81137ffe2303add9cdfb84683b59d6d5e5903d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 16 May 2023 11:08:58 +0800 Subject: [PATCH 31/34] pause source task --- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- source/libs/stream/src/streamExec.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index df7955771d..68b8dd7201 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5f9aec4db0..f4d8522f31 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); @@ -408,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } } From 24acee6e3f2a6b92379f5d4772b4db110b2daa09 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 16 May 2023 14:45:06 +0800 Subject: [PATCH 32/34] chore: restore the test cases --- tests/script/tsim/alter/table.sim | 57 +++++++++++------------ tests/script/tsim/parser/alter_column.sim | 2 +- 2 files changed, 28 insertions(+), 31 deletions(-) diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index db2a22205f..0cf291523a 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,36 +657,33 @@ if $data20 != null then return -1 endi -#print =============== error for normal table -#sql create table tb2023(ts timestamp, f int); -#sql_error alter table tb2023 add column v varchar(65535); -#sql_error alter table tb2023 add column v varchar(65535); -#sql_error alter table tb2023 add column v varchar(65530); -#sql alter table tb2023 add column v varchar(16374); -#sql_error alter table tb2023 modify column v varchar(65536); -#sql desc tb2023 -#sql alter table tb2023 drop column v -#sql_error alter table tb2023 add column v nchar(16384); -#sql alter table tb2023 add column v nchar(4093); -#sql_error alter table tb2023 modify column v nchar(16384); -#sql_error alter table tb2023 add column v nchar(16384); -#sql alter table tb2023 drop column v -#sql alter table tb2023 add column v nchar(16374); -#sql desc tb2023 -# -#print =============== error for super table -#sql create table stb2023(ts timestamp, f int) tags(t1 int); -#sql_error alter table stb2023 add column v varchar(65535); -#sql_error alter table stb2023 add column v varchar(65536); -#sql_error alter table stb2023 add column v varchar(33100); -#sql alter table stb2023 add column v varchar(16374); -#sql_error alter table stb2023 modify column v varchar(16375); -#sql desc stb2023 -#sql alter table stb2023 drop column v -#sql_error alter table stb2023 add column v nchar(4094); -#sql alter table stb2023 add column v nchar(4093); -#sql_error alter table stb2023 modify column v nchar(4094); -#sql desc stb2023 +print =============== error for normal table +sql create table tb2023(ts timestamp, f int); +sql_error alter table tb2023 add column v varchar(65518); +sql_error alter table tb2023 add column v varchar(65531); +sql_error alter table tb2023 add column v varchar(65535); +sql alter table tb2023 add column v varchar(65517); +sql_error alter table tb2023 modify column v varchar(65518); +sql desc tb2023 +sql alter table tb2023 drop column v +sql_error alter table tb2023 add column v nchar(16380); +sql alter table tb2023 add column v nchar(16379); +sql_error alter table tb2023 modify column v nchar(16380); +sql desc tb2023 + +print =============== error for super table +sql create table stb2023(ts timestamp, f int) tags(t1 int); +sql_error alter table stb2023 add column v varchar(65518); +sql_error alter table stb2023 add column v varchar(65531); +sql_error alter table stb2023 add column v varchar(65535); +sql alter table stb2023 add column v varchar(65517); +sql_error alter table stb2023 modify column v varchar(65518); +sql desc stb2023 +sql alter table stb2023 drop column v +sql_error alter table stb2023 add column v nchar(16380); +sql alter table stb2023 add column v nchar(16379); +sql_error alter table stb2023 modify column v nchar(16380); +sql desc stb2023 print ======= over sql drop database d1 diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index f892115735..2bf369b910 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql_error alter table tb modify column c2 binary(65600); +sql_error alter table tb modify column c2 binary(65436); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10); From ae3aa6535fb5ee5aef2bd7652f171f6ede1b14d8 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 17 May 2023 10:17:30 +0800 Subject: [PATCH 33/34] fix: client HB logic --- source/client/src/clientHb.c | 43 ++++++++++++------------------------ 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0c60592fc5..7c05b2f50c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -791,27 +791,20 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return NULL; } - int64_t rid = -1; - int32_t code = 0; - - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); - - SClientHbReq *pOneReq = pIter; - SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL; - if (connKey != NULL) rid = connKey->tscRid; - - STscObj *pTscObj = (STscObj *)acquireTscObj(rid); - if (pTscObj == NULL) { - tFreeClientHbBatchReq(pBatchReq); - return NULL; - } - + void *pIter = NULL; SHbParam param = {0}; + while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) { + SClientHbReq *pOneReq = pIter; + SClientHbKey *connKey = &pOneReq->connKey; + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + + if (!pTscObj) { + continue; + } - while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - switch (pOneReq->connKey.connType) { + switch (connKey->connType) { case CONN_TYPE__QUERY: { if (param.clusterId == 0) { // init @@ -824,24 +817,16 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { default: break; } - if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) { - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq); + if (clientHbMgr.reqHandle[connKey->connType]) { + int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, ¶m, pOneReq); if (code) { tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code), - pOneReq->connKey.tscRid, pOneReq->connKey.connType); + connKey->tscRid, connKey->connType); } } - if (code) { - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - pOneReq = pIter; - continue; - } - - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - pOneReq = pIter; + releaseTscObj(connKey->tscRid); } - releaseTscObj(rid); return pBatchReq; } From d57b426dbd248de78c8c67d03e98a4064a27930f Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 17 May 2023 15:34:12 +0800 Subject: [PATCH 34/34] chore: change rocksdb repo to upstream (#21318) --- cmake/rocksdb_CMakeLists.txt.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index 7d9f49d3fa..ba4a404af6 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,8 +1,8 @@ # rocksdb ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git - GIT_TAG v6.23.3 + GIT_REPOSITORY https://github.com/facebook/rocksdb.git + GIT_TAG v8.1.1 SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" CONFIGURE_COMMAND "" BUILD_COMMAND ""