diff --git a/cmake/cmake.define b/cmake/cmake.define
index 1500858d9f..f55a9bdabc 100644
--- a/cmake/cmake.define
+++ b/cmake/cmake.define
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.0)
-set(CMAKE_VERBOSE_MAKEFILE ON)
+set(CMAKE_VERBOSE_MAKEFILE OFF)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in
index 7d9f49d3fa..ba4a404af6 100644
--- a/cmake/rocksdb_CMakeLists.txt.in
+++ b/cmake/rocksdb_CMakeLists.txt.in
@@ -1,8 +1,8 @@
# rocksdb
ExternalProject_Add(rocksdb
- GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git
- GIT_TAG v6.23.3
+ GIT_REPOSITORY https://github.com/facebook/rocksdb.git
+ GIT_TAG v8.1.1
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index 4a8f4864b3..c67918351d 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG ae8d51c
+ GIT_TAG 565ca21
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/12-taos-sql/01-data-type.md b/docs/en/12-taos-sql/01-data-type.md
index 204713f971..cca256139d 100644
--- a/docs/en/12-taos-sql/01-data-type.md
+++ b/docs/en/12-taos-sql/01-data-type.md
@@ -24,24 +24,24 @@ CREATE DATABASE db_name PRECISION 'ns';
In TDengine, the data types below can be used when specifying a column or tag.
-| # | **type** | **Bytes** | **Description** |
-| --- | :--------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
-| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
-| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
-| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
-| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
-| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
-| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
-| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
-| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
-| 10 | INT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
-| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
-| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
-| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
-| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
-| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
-| 16 | VARCHAR | User-defined | Alias of BINARY |
+| # | **type** | **Bytes** | **Description** |
+| --- | :---------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
+| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
+| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
+| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
+| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
+| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
+| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
+| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
+| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
+| 10 | SMALLINT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
+| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
+| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
+| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
+| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
+| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
+| 16 | VARCHAR | User-defined | Alias of BINARY |
:::note
diff --git a/include/client/taos.h b/include/client/taos.h
index d9fd1ca1b8..8811c4ab64 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
+DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index 63e9e3799a..5b125b42d4 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -20,13 +20,13 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#ifdef __cplusplus
extern "C" {
#endif
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 865977d62b..c7e55650cd 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,16 +13,13 @@
* along with this program. If not, see .
*/
-#include "executor.h"
#include "os.h"
-#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
-#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
- SArray* pTaskList; // SArray
+ SArray* pTaskList; // SArray
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
+void streamMetaInit();
+void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index c8f3feb2d4..cae5c8715d 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) {
tscError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
+ pFile = NULL;
continue;
}
} else {
@@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
+ pFile = NULL;
truncateFile = false;
}
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 10c42bb67d..7c05b2f50c 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -24,6 +24,8 @@ typedef struct {
struct {
int64_t clusterId;
int32_t passKeyCnt;
+ int32_t passVer;
+ int32_t reqCnt;
};
};
} SHbParam;
@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS;
}
-static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
+static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
}
- int32_t code = 0;
+ int32_t code = 0;
+
+ if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
+ tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
+ goto _return;
+ }
+
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY;
@@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto _return;
}
+ // assign the passVer
+ if (param) {
+ param->passVer = pTscObj->passInfo.ver;
+ }
+
_return:
releaseTscObj(connKey->tscRid);
if (code) {
@@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
- SHbParam *hbParam = (SHbParam *)param;
- struct SCatalog *pCatalog = NULL;
+ int32_t code = 0;
+ SHbParam *hbParam = (SHbParam *)param;
+ SCatalog *pCatalog = NULL;
- int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
- if (code != TSDB_CODE_SUCCESS) {
- tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
- return code;
+ if (hbParam->reqCnt == 0) {
+ code = catalogGetHandle(hbParam->clusterId, &pCatalog);
+ if (code != TSDB_CODE_SUCCESS) {
+ tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
+ return code;
+ }
}
hbGetAppInfo(hbParam->clusterId, req);
@@ -728,23 +744,27 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) {
- hbGetUserBasicInfo(connKey, req);
+ hbGetUserBasicInfo(connKey, hbParam, req);
}
- code = hbGetExpiredUserInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
+ if (hbParam->reqCnt == 0) {
+ code = hbGetExpiredUserInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
+
+ code = hbGetExpiredDBInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
+
+ code = hbGetExpiredStbInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
}
- code = hbGetExpiredDBInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
- }
-
- code = hbGetExpiredStbInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
- }
+ ++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS;
}
@@ -766,55 +786,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
}
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
-
- int64_t rid = -1;
- int32_t code = 0;
-
- void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
-
- SClientHbReq *pOneReq = pIter;
- SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL;
- if (connKey != NULL) rid = connKey->tscRid;
-
- STscObj *pTscObj = (STscObj *)acquireTscObj(rid);
- if (pTscObj == NULL) {
+ if (!pBatchReq->reqs) {
tFreeClientHbBatchReq(pBatchReq);
return NULL;
}
- while (pIter != NULL) {
+ void *pIter = NULL;
+ SHbParam param = {0};
+ while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
+ SClientHbReq *pOneReq = pIter;
+ SClientHbKey *connKey = &pOneReq->connKey;
+ STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
+
+ if (!pTscObj) {
+ continue;
+ }
+
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
- SHbParam param;
- switch (pOneReq->connKey.connType) {
+
+ switch (connKey->connType) {
case CONN_TYPE__QUERY: {
- param.clusterId = pOneReq->clusterId;
+ if (param.clusterId == 0) {
+ // init
+ param.clusterId = pOneReq->clusterId;
+ param.passVer = INT32_MIN;
+ }
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break;
}
default:
break;
}
- if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) {
- code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq);
+ if (clientHbMgr.reqHandle[connKey->connType]) {
+ int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, ¶m, pOneReq);
if (code) {
tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
- pOneReq->connKey.tscRid, pOneReq->connKey.connType);
+ connKey->tscRid, connKey->connType);
}
}
- break;
-#if 0
- if (code) {
- pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
- pOneReq = pIter;
- continue;
- }
-
- pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
- pOneReq = pIter;
-#endif
+ releaseTscObj(connKey->tscRid);
}
- releaseTscObj(rid);
return pBatchReq;
}
@@ -885,7 +897,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo();
}
- SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) {
@@ -894,7 +905,6 @@ static void *hbThreadFunc(void *param) {
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
- taosArrayPush(mgr, &pAppHbMgr);
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
@@ -908,7 +918,6 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
- taosArrayPush(mgr, &pAppHbMgr);
break;
}
@@ -920,7 +929,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
- taosArrayPush(mgr, &pAppHbMgr);
break;
}
pInfo->fp = hbAsyncCallBack;
@@ -941,12 +949,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
- taosArrayPush(mgr, &pAppHbMgr);
}
- taosArrayDestroy(clientHbMgr.appHbMgrs);
- clientHbMgr.appHbMgrs = mgr;
-
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL);
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index 87aee4a8a3..63e8b3097c 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
}
}
+int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
+ if (TD_RES_TMQ(res)) {
+ SMqRspObj* pRspObj = (SMqRspObj*) res;
+ STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
+ if (pOffset->type == TMQ_OFFSET__LOG) {
+ return pRspObj->rsp.rspOffset.version;
+ }
+ } else if (TD_RES_TMQ_META(res)) {
+ SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
+ if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
+ return pRspObj->metaRsp.rspOffset.version;
+ }
+ } else if (TD_RES_TMQ_METADATA(res)) {
+ SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
+ if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
+ return pRspObj->rsp.rspOffset.version;
+ }
+ }
+
+ // data from tsdb, no valid offset info
+ return -1;
+}
+
const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
index 06b6221940..89c394fdd0 100644
--- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
@@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) {
dError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
+ pFile = NULL;
continue;
}
} else {
@@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
+ pFile = NULL;
truncateFile = false;
}
diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
index d884120147..544512233e 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
@@ -18,6 +18,7 @@
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
+#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
indexInit(tsNumOfCommitThreads);
+ streamMetaInit();
dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode);
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
+ streamMetaCleanup();
indexCleanup();
taosConvDestroy();
dDebug("dnode is closed, ptr:%p", pDnode);
diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c
index 002407ce8a..d4cbcaaacd 100644
--- a/source/dnode/mnode/impl/src/mndDnode.c
+++ b/source/dnode/mnode/impl/src/mndDnode.c
@@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
- if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) {
+ if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
code = terrno;
goto _OVER;
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index df7955771d..68b8dd7201 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (mndPauseStreamTask(pTrans, pTask) < 0) {
+ if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
}
@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
+ if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
}
diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c
index d56595dae9..c59669fc53 100644
--- a/source/libs/executor/src/timesliceoperator.c
+++ b/source/libs/executor/src/timesliceoperator.c
@@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
}
taosArrayDestroy(pInfo->pLinearInfo);
- taosMemoryFree(pInfo->pPrevGroupKey->pData);
- taosMemoryFree(pInfo->pPrevGroupKey);
+ if (pInfo->pPrevGroupKey) {
+ taosMemoryFree(pInfo->pPrevGroupKey->pData);
+ taosMemoryFree(pInfo->pPrevGroupKey);
+ }
cleanupExprSupp(&pInfo->scalarSup);
diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h
index 5d2970a4b7..0f39cf817b 100644
--- a/source/libs/stream/inc/streamBackendRocksdb.h
+++ b/source/libs/stream/inc/streamBackendRocksdb.h
@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
-#include "executor.h"
-
#include "rocksdb/c.h"
// #include "streamInc.h"
#include "streamState.h"
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
-void* streamStateCreateBatch();
-int32_t streamStateGetBatchSize(void* pBatch);
-void streamStateClearBatch(void* pBatch);
-void streamStateDestroyBatch(void* pBatch);
-int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
-int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
-
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
+ void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h
index 71fbe5e086..c471bc2bd8 100644
--- a/source/libs/stream/inc/streamInc.h
+++ b/source/libs/stream/inc/streamInc.h
@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
-//#include "executor.h"
+#include "executor.h"
+#include "query.h"
#include "tstream.h"
+#include "trpc.h"
+
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index db4ec17b19..16ba81c74a 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -13,8 +13,9 @@
* along with this program. If not, see .
*/
-// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h"
+#include "executor.h"
+#include "query.h"
#include "tcommon.h"
typedef struct SCompactFilteFactory {
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear(err);
}
} else {
+ /*
+ list all cf and get prefix
+ */
int64_t streamId;
int32_t taskId, dummpy = 0;
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
- // int64_t unixTime = taosGetTimestampMs();
- if (streamStateValueIsStale((char*)val)) {
- return 1;
- }
- // SStreamValue value;
- // memset(&value, 0, sizeof(value));
- // streamValueDecode(&value, (char*)val);
- // taosMemoryFree(value.data);
- // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
- // return 1;
- // }
- return 0;
+ return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy(cfNames[0], "default", strlen("default"));
continue;
}
- qError("cf name %s", idstr);
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
if (i % cfLen == 0) {
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
}
}
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- qError("cf name %s", cfNames[i]);
- }
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
for (int i = 0; i < nSize * cfLen + 1; i++) {
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
- // return -1;
}
}
pState->pTdbState->rocksdb = handle->db;
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
-#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
- do { \
- code = 0; \
- char buf[128] = {0}; \
- char* err = NULL; \
- int i = streamGetInit(funcname); \
- if (i < 0) { \
- qWarn("streamState failed to get cf name: %s", funcname); \
- code = -1; \
- break; \
- } \
- char toString[128] = {0}; \
- if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
- int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
- size_t len = 0; \
- char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
- if (val == NULL) { \
- qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
- if (err != NULL) taosMemoryFree(err); \
- code = -1; \
- } else { \
- char * p = NULL, *end = NULL; \
- int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
- if (len < 0) { \
- qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
- code = -1; \
- } else { \
- qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
- } \
- if (pVal != NULL) { \
- *pVal = p; \
- } else { \
- taosMemoryFree(p); \
- } \
- taosMemoryFree(val); \
- if (vLen != NULL) *vLen = len; \
- } \
- if (err != NULL) { \
- taosMemoryFree(err); \
- qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
- code = -1; \
- } else { \
- if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
- } \
+#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
+ do { \
+ code = 0; \
+ char buf[128] = {0}; \
+ char* err = NULL; \
+ int i = streamGetInit(funcname); \
+ if (i < 0) { \
+ qWarn("streamState failed to get cf name: %s", funcname); \
+ code = -1; \
+ break; \
+ } \
+ char toString[128] = {0}; \
+ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
+ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
+ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
+ size_t len = 0; \
+ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
+ if (val == NULL) { \
+ if (err == NULL) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
+ funcname); \
+ } else { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
+ err); \
+ taosMemoryFreeClear(err); \
+ } \
+ code = -1; \
+ } else { \
+ char* p = NULL; \
+ int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
+ if (len < 0) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
+ funcname); \
+ code = -1; \
+ } else { \
+ qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
+ len); \
+ } \
+ taosMemoryFree(val); \
+ if (vLen != NULL) *vLen = len; \
+ } \
+ if (code == 0) \
+ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) {
- qWarn(
- "failed to delete range cf(state) err: %s, "
- "start: %s, end:%s",
- err, toStringStart, toStringEnd);
+ qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
}
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1;
}
- size_t tlen;
- char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
+ size_t klen, vlen;
+ char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
winKeyDecode(&winKey, keyStr);
- size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
- char* dst = NULL;
- int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
+ // char* dst = NULL;
+ int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) {
return -1;
}
-
- if (pVal != NULL) *pVal = (char*)dst;
- if (pVLen != NULL) *pVLen = vlen;
+ if (pVLen != NULL) *pVLen = len;
*pKey = winKey;
return 0;
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen) {
+ void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(cfName);
if (i < 0) {
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
- int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
+ int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 0fb78fb589..f4d8522f31 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
}
@@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
- if (status != TASK_STATUS__NORMAL) {
+ if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
- qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
- pSubmit->submit.msgLen, pSubmit->submit.ver);
+ qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
+ pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
- qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
+ qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
- taosFreeQitem(pRes);
+ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
+ taosFreeQitem(qRes);
return code;
}
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
- if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
+ if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
- pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
+ pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock);
@@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
- if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
+ if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index de56cf24ca..682ce08c7f 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -19,6 +19,13 @@
#include "tref.h"
#include "ttimer.h"
+static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
+static int32_t streamBackendId = 0;
+static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
+
+void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
+void streamMetaCleanup() { taosCloseRef(streamBackendId); }
+
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
- taosMemoryFree(streamPath);
goto _err;
}
+ memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
+ pMeta->streamBackendId = streamBackendId;
- char* statePath = taosMemoryCalloc(1, len);
- sprintf(statePath, "%s/%s", pMeta->path, "state");
- code = taosMulModeMkDir(statePath, 0755);
+ memset(streamPath, 0, len);
+ sprintf(streamPath, "%s/%s", pMeta->path, "state");
+ code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- pMeta->streamBackend = streamBackendInit(statePath);
- pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup);
- pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
+ pMeta->streamBackend = streamBackendInit(streamPath);
+ pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
- taosMemoryFree(statePath);
+ taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock);
return pMeta;
_err:
+ taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup(pMeta->pTasks);
- taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid);
- // streamBackendCleanup(pMeta->streamBackend);
- taosCloseRef(pMeta->streamBackendId);
+ taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
-
- // taosWLockLatch(&pMeta->lock);
-
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
- //
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index 1cca4d55cf..373cb27941 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState->taskId = pTask->id.taskId;
pState->streamId = pTask->id.streamId;
#ifdef USE_ROCKSDB
- qWarn("open stream state1");
+ // qWarn("open stream state1");
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
if (code == -1) {
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
+ taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db);
#endif
- taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
}
int32_t streamStateBegin(SStreamState* pState) {
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t code = 0;
void* batch = streamStateCreateBatch();
- code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
+ code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) {
return code;
}
diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c
index b7401ec5d9..67835e77b8 100644
--- a/source/libs/stream/src/tstreamFileState.c
+++ b/source/libs/stream/src/tstreamFileState.c
@@ -15,6 +15,7 @@
#include "tstreamFileState.h"
+#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true);
}
-bool needClearDiskBuff(SStreamFileState* pFileState) {
- return pFileState->flushMark > 0;
-}
+bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
- clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs;
}
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
- code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
+ code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
}
if (streamStateGetBatchSize(batch) > 0) {
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf);
}
{
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
- deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ deleteExpiredCheckPoint(pFileState, mark);
void* pStVal = NULL;
int32_t len = 0;
diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt
index 5a97ba45f6..a0c1717690 100644
--- a/source/libs/stream/test/CMakeLists.txt
+++ b/source/libs/stream/test/CMakeLists.txt
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(
streamUpdateTest
- PUBLIC os util common gtest stream
+ PUBLIC os util common gtest gtest_main stream
)
TARGET_INCLUDE_DIRECTORIES(
diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp
index c698187874..18c60aff28 100644
--- a/source/libs/stream/test/tstreamUpdateTest.cpp
+++ b/source/libs/stream/test/tstreamUpdateTest.cpp
@@ -1,11 +1,28 @@
#include
+#include "streamBackendRocksdb.h"
+#include "tstream.h"
#include "tstreamUpdate.h"
#include "ttime.h"
using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
+class StreamStateEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ streamMetaInit();
+ backend = streamBackendInit(path);
+ }
+ virtual void TearDown() {
+ streamMetaCleanup();
+ // indexClose(index);
+ }
+
+ const char *path = TD_TMP_DIR_PATH "stream";
+ void *backend;
+};
+
bool equalSBF(SScalableBf *left, SScalableBf *right) {
if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false;
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7);
}
-
-int main(int argc, char *argv[]) {
- testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
\ No newline at end of file
+// TEST()
+TEST(StreamStateEnv, test1) {}
+// int main(int argc, char *argv[]) {
+// testing::InitGoogleTest(&argc, argv);
+// return RUN_ALL_TESTS();
+// }
\ No newline at end of file
diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim
index db2a22205f..0cf291523a 100644
--- a/tests/script/tsim/alter/table.sim
+++ b/tests/script/tsim/alter/table.sim
@@ -657,36 +657,33 @@ if $data20 != null then
return -1
endi
-#print =============== error for normal table
-#sql create table tb2023(ts timestamp, f int);
-#sql_error alter table tb2023 add column v varchar(65535);
-#sql_error alter table tb2023 add column v varchar(65535);
-#sql_error alter table tb2023 add column v varchar(65530);
-#sql alter table tb2023 add column v varchar(16374);
-#sql_error alter table tb2023 modify column v varchar(65536);
-#sql desc tb2023
-#sql alter table tb2023 drop column v
-#sql_error alter table tb2023 add column v nchar(16384);
-#sql alter table tb2023 add column v nchar(4093);
-#sql_error alter table tb2023 modify column v nchar(16384);
-#sql_error alter table tb2023 add column v nchar(16384);
-#sql alter table tb2023 drop column v
-#sql alter table tb2023 add column v nchar(16374);
-#sql desc tb2023
-#
-#print =============== error for super table
-#sql create table stb2023(ts timestamp, f int) tags(t1 int);
-#sql_error alter table stb2023 add column v varchar(65535);
-#sql_error alter table stb2023 add column v varchar(65536);
-#sql_error alter table stb2023 add column v varchar(33100);
-#sql alter table stb2023 add column v varchar(16374);
-#sql_error alter table stb2023 modify column v varchar(16375);
-#sql desc stb2023
-#sql alter table stb2023 drop column v
-#sql_error alter table stb2023 add column v nchar(4094);
-#sql alter table stb2023 add column v nchar(4093);
-#sql_error alter table stb2023 modify column v nchar(4094);
-#sql desc stb2023
+print =============== error for normal table
+sql create table tb2023(ts timestamp, f int);
+sql_error alter table tb2023 add column v varchar(65518);
+sql_error alter table tb2023 add column v varchar(65531);
+sql_error alter table tb2023 add column v varchar(65535);
+sql alter table tb2023 add column v varchar(65517);
+sql_error alter table tb2023 modify column v varchar(65518);
+sql desc tb2023
+sql alter table tb2023 drop column v
+sql_error alter table tb2023 add column v nchar(16380);
+sql alter table tb2023 add column v nchar(16379);
+sql_error alter table tb2023 modify column v nchar(16380);
+sql desc tb2023
+
+print =============== error for super table
+sql create table stb2023(ts timestamp, f int) tags(t1 int);
+sql_error alter table stb2023 add column v varchar(65518);
+sql_error alter table stb2023 add column v varchar(65531);
+sql_error alter table stb2023 add column v varchar(65535);
+sql alter table stb2023 add column v varchar(65517);
+sql_error alter table stb2023 modify column v varchar(65518);
+sql desc stb2023
+sql alter table stb2023 drop column v
+sql_error alter table stb2023 add column v nchar(16380);
+sql alter table stb2023 add column v nchar(16379);
+sql_error alter table stb2023 modify column v nchar(16380);
+sql desc stb2023
print ======= over
sql drop database d1
diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim
index f892115735..2bf369b910 100644
--- a/tests/script/tsim/parser/alter_column.sim
+++ b/tests/script/tsim/parser/alter_column.sim
@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
-sql_error alter table tb modify column c2 binary(65600);
+sql_error alter table tb modify column c2 binary(65436);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);