Merge branch '3.0' into fix/TD-24097
This commit is contained in:
commit
5778fa676f
|
@ -1,6 +1,6 @@
|
|||
cmake_minimum_required(VERSION 3.0)
|
||||
|
||||
set(CMAKE_VERBOSE_MAKEFILE ON)
|
||||
set(CMAKE_VERBOSE_MAKEFILE OFF)
|
||||
set(TD_BUILD_TAOSA_INTERNAL FALSE)
|
||||
|
||||
#set output directory
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
|
||||
# rocksdb
|
||||
ExternalProject_Add(rocksdb
|
||||
GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git
|
||||
GIT_TAG v6.23.3
|
||||
GIT_REPOSITORY https://github.com/facebook/rocksdb.git
|
||||
GIT_TAG v8.1.1
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
|
||||
CONFIGURE_COMMAND ""
|
||||
BUILD_COMMAND ""
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# taosadapter
|
||||
ExternalProject_Add(taosadapter
|
||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||
GIT_TAG ae8d51c
|
||||
GIT_TAG 565ca21
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -24,24 +24,24 @@ CREATE DATABASE db_name PRECISION 'ns';
|
|||
|
||||
In TDengine, the data types below can be used when specifying a column or tag.
|
||||
|
||||
| # | **type** | **Bytes** | **Description** |
|
||||
| --- | :--------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
|
||||
| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
|
||||
| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
|
||||
| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
|
||||
| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
|
||||
| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
|
||||
| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
|
||||
| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
|
||||
| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
|
||||
| 10 | INT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
|
||||
| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
|
||||
| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
|
||||
| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
|
||||
| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
|
||||
| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
|
||||
| 16 | VARCHAR | User-defined | Alias of BINARY |
|
||||
| # | **type** | **Bytes** | **Description** |
|
||||
| --- | :---------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
|
||||
| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
|
||||
| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
|
||||
| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
|
||||
| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
|
||||
| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
|
||||
| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
|
||||
| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
|
||||
| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
|
||||
| 10 | SMALLINT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
|
||||
| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
|
||||
| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
|
||||
| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
|
||||
| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
|
||||
| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
|
||||
| 16 | VARCHAR | User-defined | Alias of BINARY |
|
||||
|
||||
:::note
|
||||
|
||||
|
|
|
@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
|
|||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
|
||||
|
||||
/* ------------------------------ TAOSX -----------------------------------*/
|
||||
// note: following apis are unstable
|
||||
|
|
|
@ -20,13 +20,13 @@
|
|||
#include "tsimplehash.h"
|
||||
#include "tstreamFileState.h"
|
||||
|
||||
#ifndef _STREAM_STATE_H_
|
||||
#define _STREAM_STATE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#ifndef _STREAM_STATE_H_
|
||||
#define _STREAM_STATE_H_
|
||||
|
||||
// void* streamBackendInit(const char* path);
|
||||
// void streamBackendCleanup(void* arg);
|
||||
// SListNode* streamBackendAddCompare(void* backend, void* arg);
|
||||
|
|
|
@ -13,16 +13,13 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "executor.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "streamState.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tdbInt.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -340,7 +337,7 @@ typedef struct SStreamMeta {
|
|||
TTB* pTaskDb;
|
||||
TTB* pCheckpointDb;
|
||||
SHashObj* pTasks;
|
||||
SArray* pTaskList; // SArray<task_id*>
|
||||
SArray* pTaskList; // SArray<task_id*>
|
||||
void* ahandle;
|
||||
TXN* txn;
|
||||
FTaskExpand* expandFunc;
|
||||
|
@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
|
|||
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
|
||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
|
||||
|
||||
void streamMetaInit();
|
||||
void streamMetaCleanup();
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
|
||||
|
|
|
@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) {
|
|||
tscError("failed to send crash report");
|
||||
if (pFile) {
|
||||
taosReleaseCrashLogFile(pFile, false);
|
||||
pFile = NULL;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) {
|
|||
|
||||
if (pFile) {
|
||||
taosReleaseCrashLogFile(pFile, truncateFile);
|
||||
pFile = NULL;
|
||||
truncateFile = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ typedef struct {
|
|||
struct {
|
||||
int64_t clusterId;
|
||||
int32_t passKeyCnt;
|
||||
int32_t passVer;
|
||||
int32_t reqCnt;
|
||||
};
|
||||
};
|
||||
} SHbParam;
|
||||
|
@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
||||
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
||||
if (!pTscObj) {
|
||||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
|
||||
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
|
||||
if (!user) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
// assign the passVer
|
||||
if (param) {
|
||||
param->passVer = pTscObj->passInfo.ver;
|
||||
}
|
||||
|
||||
_return:
|
||||
releaseTscObj(connKey->tscRid);
|
||||
if (code) {
|
||||
|
@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
|
|||
}
|
||||
|
||||
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
|
||||
SHbParam *hbParam = (SHbParam *)param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
int32_t code = 0;
|
||||
SHbParam *hbParam = (SHbParam *)param;
|
||||
SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
|
||||
return code;
|
||||
if (hbParam->reqCnt == 0) {
|
||||
code = catalogGetHandle(hbParam->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
hbGetAppInfo(hbParam->clusterId, req);
|
||||
|
@ -728,23 +744,27 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
hbGetQueryBasicInfo(connKey, req);
|
||||
|
||||
if (hbParam->passKeyCnt > 0) {
|
||||
hbGetUserBasicInfo(connKey, req);
|
||||
hbGetUserBasicInfo(connKey, hbParam, req);
|
||||
}
|
||||
|
||||
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
if (hbParam->reqCnt == 0) {
|
||||
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
++hbParam->reqCnt; // success to get catalog info
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -766,55 +786,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
}
|
||||
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
||||
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
|
||||
|
||||
int64_t rid = -1;
|
||||
int32_t code = 0;
|
||||
|
||||
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
|
||||
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL;
|
||||
if (connKey != NULL) rid = connKey->tscRid;
|
||||
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(rid);
|
||||
if (pTscObj == NULL) {
|
||||
if (!pBatchReq->reqs) {
|
||||
tFreeClientHbBatchReq(pBatchReq);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while (pIter != NULL) {
|
||||
void *pIter = NULL;
|
||||
SHbParam param = {0};
|
||||
while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
SClientHbKey *connKey = &pOneReq->connKey;
|
||||
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
|
||||
|
||||
if (!pTscObj) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
|
||||
SHbParam param;
|
||||
switch (pOneReq->connKey.connType) {
|
||||
|
||||
switch (connKey->connType) {
|
||||
case CONN_TYPE__QUERY: {
|
||||
param.clusterId = pOneReq->clusterId;
|
||||
if (param.clusterId == 0) {
|
||||
// init
|
||||
param.clusterId = pOneReq->clusterId;
|
||||
param.passVer = INT32_MIN;
|
||||
}
|
||||
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) {
|
||||
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq);
|
||||
if (clientHbMgr.reqHandle[connKey->connType]) {
|
||||
int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, ¶m, pOneReq);
|
||||
if (code) {
|
||||
tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
|
||||
pOneReq->connKey.tscRid, pOneReq->connKey.connType);
|
||||
connKey->tscRid, connKey->connType);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
#if 0
|
||||
if (code) {
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
pOneReq = pIter;
|
||||
continue;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
pOneReq = pIter;
|
||||
#endif
|
||||
releaseTscObj(connKey->tscRid);
|
||||
}
|
||||
releaseTscObj(rid);
|
||||
|
||||
return pBatchReq;
|
||||
}
|
||||
|
@ -885,7 +897,6 @@ static void *hbThreadFunc(void *param) {
|
|||
hbGatherAppInfo();
|
||||
}
|
||||
|
||||
SArray *mgr = taosArrayInit(sz, sizeof(void *));
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||
if (pAppHbMgr == NULL) {
|
||||
|
@ -894,7 +905,6 @@ static void *hbThreadFunc(void *param) {
|
|||
|
||||
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
||||
if (connCnt == 0) {
|
||||
taosArrayPush(mgr, &pAppHbMgr);
|
||||
continue;
|
||||
}
|
||||
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
|
||||
|
@ -908,7 +918,6 @@ static void *hbThreadFunc(void *param) {
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tFreeClientHbBatchReq(pReq);
|
||||
// hbClearReqInfo(pAppHbMgr);
|
||||
taosArrayPush(mgr, &pAppHbMgr);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -920,7 +929,6 @@ static void *hbThreadFunc(void *param) {
|
|||
tFreeClientHbBatchReq(pReq);
|
||||
// hbClearReqInfo(pAppHbMgr);
|
||||
taosMemoryFree(buf);
|
||||
taosArrayPush(mgr, &pAppHbMgr);
|
||||
break;
|
||||
}
|
||||
pInfo->fp = hbAsyncCallBack;
|
||||
|
@ -941,12 +949,8 @@ static void *hbThreadFunc(void *param) {
|
|||
// hbClearReqInfo(pAppHbMgr);
|
||||
|
||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||
taosArrayPush(mgr, &pAppHbMgr);
|
||||
}
|
||||
|
||||
taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||
clientHbMgr.appHbMgrs = mgr;
|
||||
|
||||
taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||
|
||||
taosMsleep(HEARTBEAT_INTERVAL);
|
||||
|
|
|
@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*) res;
|
||||
STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
return pRspObj->rsp.rspOffset.version;
|
||||
}
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
|
||||
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
|
||||
return pRspObj->metaRsp.rspOffset.version;
|
||||
}
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
|
||||
if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
|
||||
return pRspObj->rsp.rspOffset.version;
|
||||
}
|
||||
}
|
||||
|
||||
// data from tsdb, no valid offset info
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char* tmq_get_table_name(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
||||
|
|
|
@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) {
|
|||
dError("failed to send crash report");
|
||||
if (pFile) {
|
||||
taosReleaseCrashLogFile(pFile, false);
|
||||
pFile = NULL;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) {
|
|||
|
||||
if (pFile) {
|
||||
taosReleaseCrashLogFile(pFile, truncateFile);
|
||||
pFile = NULL;
|
||||
truncateFile = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "dmNodes.h"
|
||||
#include "index.h"
|
||||
#include "qworker.h"
|
||||
#include "tstream.h"
|
||||
|
||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
indexInit(tsNumOfCommitThreads);
|
||||
streamMetaInit();
|
||||
|
||||
dmReportStartup("dnode-transport", "initialized");
|
||||
dDebug("dnode is created, ptr:%p", pDnode);
|
||||
|
@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
|||
dmCleanupServer(pDnode);
|
||||
dmClearVars(pDnode);
|
||||
rpcCleanup();
|
||||
streamMetaCleanup();
|
||||
indexCleanup();
|
||||
taosConvDestroy();
|
||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||
|
|
|
@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
|||
SDnodeObj *pDnode = NULL;
|
||||
SCreateDnodeReq createReq = {0};
|
||||
|
||||
if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) {
|
||||
if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
|
|||
}
|
||||
taosArrayDestroy(pInfo->pLinearInfo);
|
||||
|
||||
taosMemoryFree(pInfo->pPrevGroupKey->pData);
|
||||
taosMemoryFree(pInfo->pPrevGroupKey);
|
||||
if (pInfo->pPrevGroupKey) {
|
||||
taosMemoryFree(pInfo->pPrevGroupKey->pData);
|
||||
taosMemoryFree(pInfo->pPrevGroupKey);
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
|
||||
#define _STREAM_BACKEDN_ROCKSDB_H_
|
||||
|
||||
#include "executor.h"
|
||||
|
||||
#include "rocksdb/c.h"
|
||||
// #include "streamInc.h"
|
||||
#include "streamState.h"
|
||||
|
@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
|
|||
|
||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
|
||||
|
||||
void* streamStateCreateBatch();
|
||||
int32_t streamStateGetBatchSize(void* pBatch);
|
||||
void streamStateClearBatch(void* pBatch);
|
||||
void streamStateDestroyBatch(void* pBatch);
|
||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen);
|
||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
|
||||
|
||||
// default cf
|
||||
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
|
||||
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
|
||||
|
@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
|
|||
void streamStateClearBatch(void* pBatch);
|
||||
void streamStateDestroyBatch(void* pBatch);
|
||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen);
|
||||
void* val, int32_t vlen, int64_t ttl);
|
||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
|
||||
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
|
||||
#endif
|
|
@ -16,9 +16,12 @@
|
|||
#ifndef _STREAM_INC_H_
|
||||
#define _STREAM_INC_H_
|
||||
|
||||
//#include "executor.h"
|
||||
#include "executor.h"
|
||||
#include "query.h"
|
||||
#include "tstream.h"
|
||||
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
|
|
@ -13,8 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// #include "streamStateRocksdb.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "executor.h"
|
||||
#include "query.h"
|
||||
#include "tcommon.h"
|
||||
|
||||
typedef struct SCompactFilteFactory {
|
||||
|
@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
|
|||
taosMemoryFreeClear(err);
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
list all cf and get prefix
|
||||
*/
|
||||
int64_t streamId;
|
||||
int32_t taskId, dummpy = 0;
|
||||
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
|
@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
|
|||
void destroyCompactFilte(void* arg) { (void)arg; }
|
||||
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
|
||||
char** newval, size_t* newvlen, unsigned char* value_changed) {
|
||||
// int64_t unixTime = taosGetTimestampMs();
|
||||
if (streamStateValueIsStale((char*)val)) {
|
||||
return 1;
|
||||
}
|
||||
// SStreamValue value;
|
||||
// memset(&value, 0, sizeof(value));
|
||||
// streamValueDecode(&value, (char*)val);
|
||||
// taosMemoryFree(value.data);
|
||||
// if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
|
||||
// return 1;
|
||||
// }
|
||||
return 0;
|
||||
return streamStateValueIsStale((char*)val) ? 1 : 0;
|
||||
}
|
||||
const char* compactFilteName(void* arg) { return "stream_filte"; }
|
||||
|
||||
|
@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
|
|||
memcpy(cfNames[0], "default", strlen("default"));
|
||||
continue;
|
||||
}
|
||||
qError("cf name %s", idstr);
|
||||
|
||||
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
|
||||
if (i % cfLen == 0) {
|
||||
|
@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
|
|||
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
||||
qError("cf name %s", cfNames[i]);
|
||||
}
|
||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
|
||||
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
|
||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
||||
|
@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
|||
if (err != NULL) {
|
||||
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
||||
taosMemoryFreeClear(err);
|
||||
// return -1;
|
||||
}
|
||||
}
|
||||
pState->pTdbState->rocksdb = handle->db;
|
||||
|
@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
|||
taosMemoryFree(ttlV); \
|
||||
} while (0);
|
||||
|
||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||
do { \
|
||||
code = 0; \
|
||||
char buf[128] = {0}; \
|
||||
char* err = NULL; \
|
||||
int i = streamGetInit(funcname); \
|
||||
if (i < 0) { \
|
||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
char toString[128] = {0}; \
|
||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
||||
size_t len = 0; \
|
||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||
if (val == NULL) { \
|
||||
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
|
||||
if (err != NULL) taosMemoryFree(err); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
char * p = NULL, *end = NULL; \
|
||||
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
|
||||
if (len < 0) { \
|
||||
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
|
||||
} \
|
||||
if (pVal != NULL) { \
|
||||
*pVal = p; \
|
||||
} else { \
|
||||
taosMemoryFree(p); \
|
||||
} \
|
||||
taosMemoryFree(val); \
|
||||
if (vLen != NULL) *vLen = len; \
|
||||
} \
|
||||
if (err != NULL) { \
|
||||
taosMemoryFree(err); \
|
||||
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
|
||||
} \
|
||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||
do { \
|
||||
code = 0; \
|
||||
char buf[128] = {0}; \
|
||||
char* err = NULL; \
|
||||
int i = streamGetInit(funcname); \
|
||||
if (i < 0) { \
|
||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
char toString[128] = {0}; \
|
||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
||||
size_t len = 0; \
|
||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||
if (val == NULL) { \
|
||||
if (err == NULL) { \
|
||||
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
|
||||
funcname); \
|
||||
} else { \
|
||||
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
|
||||
err); \
|
||||
taosMemoryFreeClear(err); \
|
||||
} \
|
||||
code = -1; \
|
||||
} else { \
|
||||
char* p = NULL; \
|
||||
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
|
||||
if (len < 0) { \
|
||||
qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
|
||||
funcname); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
|
||||
len); \
|
||||
} \
|
||||
taosMemoryFree(val); \
|
||||
if (vLen != NULL) *vLen = len; \
|
||||
} \
|
||||
if (code == 0) \
|
||||
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
|
||||
} while (0);
|
||||
|
||||
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
|
||||
|
@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
|||
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
|
||||
// eLen);
|
||||
if (err != NULL) {
|
||||
qWarn(
|
||||
"failed to delete range cf(state) err: %s, "
|
||||
"start: %s, end:%s",
|
||||
err, toStringStart, toStringEnd);
|
||||
qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
|
||||
taosMemoryFree(err);
|
||||
}
|
||||
|
||||
|
@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
|
|||
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
|
||||
return -1;
|
||||
}
|
||||
size_t tlen;
|
||||
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
|
||||
size_t klen, vlen;
|
||||
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
|
||||
winKeyDecode(&winKey, keyStr);
|
||||
|
||||
size_t vlen = 0;
|
||||
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
|
||||
char* dst = NULL;
|
||||
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
|
||||
// char* dst = NULL;
|
||||
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
|
||||
if (len < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pVal != NULL) *pVal = (char*)dst;
|
||||
if (pVLen != NULL) *pVLen = vlen;
|
||||
if (pVLen != NULL) *pVLen = len;
|
||||
|
||||
*pKey = winKey;
|
||||
return 0;
|
||||
|
@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
|
|||
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
|
||||
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen) {
|
||||
void* val, int32_t vlen, int64_t ttl) {
|
||||
int i = streamGetInit(cfName);
|
||||
|
||||
if (i < 0) {
|
||||
|
@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
|
|||
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
|
||||
|
||||
char* ttlV = NULL;
|
||||
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
|
||||
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
|
||||
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
|
||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||
taosMemoryFree(ttlV);
|
||||
|
|
|
@ -20,12 +20,12 @@
|
|||
#define MIN_STREAM_EXEC_BATCH_NUM 16
|
||||
|
||||
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
|
||||
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
||||
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
|
||||
}
|
||||
|
||||
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
|
||||
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
|
||||
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
||||
return (status == TASK_STATUS__PAUSE);
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
|
||||
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||
if (status != TASK_STATUS__NORMAL) {
|
||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
|
||||
atomic_load_8(&pTask->status.taskStatus));
|
||||
taosMsleep(2);
|
||||
|
@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
||||
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
|
||||
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
|
||||
pSubmit->submit.msgLen, pSubmit->submit.ver);
|
||||
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
|
||||
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
|
||||
|
||||
SArray* pBlockList = pBlock->blocks;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
||||
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
|
||||
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
|
||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
|
||||
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
|
||||
|
@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
qRes->blocks = pRes;
|
||||
code = streamTaskOutput(pTask, qRes);
|
||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
||||
taosFreeQitem(pRes);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
int64_t ckId = 0;
|
||||
int64_t dataVer = 0;
|
||||
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
|
||||
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
|
||||
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
|
||||
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
|
||||
", checkPoint id:%" PRId64 " -> %" PRId64,
|
||||
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
|
||||
|
||||
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
|
||||
pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
|
||||
|
||||
taosWLockLatch(&pTask->pMeta->lock);
|
||||
|
||||
|
@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
||||
|
||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
|
||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,13 @@
|
|||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||
static int32_t streamBackendId = 0;
|
||||
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
|
||||
|
||||
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||
void streamMetaCleanup() { taosCloseRef(streamBackendId); }
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
|
||||
int32_t code = -1;
|
||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||
|
@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
sprintf(streamPath, "%s/%s", path, "stream");
|
||||
pMeta->path = taosStrdup(streamPath);
|
||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
|
||||
taosMemoryFree(streamPath);
|
||||
goto _err;
|
||||
}
|
||||
memset(streamPath, 0, len);
|
||||
|
||||
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
||||
code = taosMulModeMkDir(streamPath, 0755);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
taosMemoryFree(streamPath);
|
||||
goto _err;
|
||||
}
|
||||
taosMemoryFree(streamPath);
|
||||
|
||||
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||
goto _err;
|
||||
|
@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->vgId = vgId;
|
||||
pMeta->ahandle = ahandle;
|
||||
pMeta->expandFunc = expandFunc;
|
||||
pMeta->streamBackendId = streamBackendId;
|
||||
|
||||
char* statePath = taosMemoryCalloc(1, len);
|
||||
sprintf(statePath, "%s/%s", pMeta->path, "state");
|
||||
code = taosMulModeMkDir(statePath, 0755);
|
||||
memset(streamPath, 0, len);
|
||||
sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
||||
code = taosMulModeMkDir(streamPath, 0755);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
taosMemoryFree(streamPath);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->streamBackend = streamBackendInit(statePath);
|
||||
pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup);
|
||||
pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
|
||||
pMeta->streamBackend = streamBackendInit(streamPath);
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
|
||||
taosMemoryFree(statePath);
|
||||
taosMemoryFree(streamPath);
|
||||
|
||||
taosInitRWLatch(&pMeta->lock);
|
||||
return pMeta;
|
||||
|
||||
_err:
|
||||
taosMemoryFree(streamPath);
|
||||
taosMemoryFree(pMeta->path);
|
||||
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
||||
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
||||
|
@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid);
|
||||
// streamBackendCleanup(pMeta->streamBackend);
|
||||
taosCloseRef(pMeta->streamBackendId);
|
||||
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
|
||||
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
|
||||
taosMemoryFree(pMeta->path);
|
||||
taosMemoryFree(pMeta);
|
||||
|
@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
|
||||
// taosWLockLatch(&pMeta->lock);
|
||||
|
||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
|
||||
|
||||
//
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
|
||||
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
|
|
@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
pState->taskId = pTask->id.taskId;
|
||||
pState->streamId = pTask->id.streamId;
|
||||
#ifdef USE_ROCKSDB
|
||||
qWarn("open stream state1");
|
||||
// qWarn("open stream state1");
|
||||
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
||||
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
|
||||
if (code == -1) {
|
||||
|
@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
|
|||
#ifdef USE_ROCKSDB
|
||||
// streamStateCloseBackend(pState);
|
||||
streamStateDestroy(pState, remove);
|
||||
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
||||
#else
|
||||
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
||||
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
|
||||
|
@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
|
|||
tdbTbClose(pState->pTdbState->pParTagDb);
|
||||
tdbClose(pState->pTdbState->db);
|
||||
#endif
|
||||
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
|
||||
}
|
||||
|
||||
int32_t streamStateBegin(SStreamState* pState) {
|
||||
|
@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
|
|||
int32_t code = 0;
|
||||
void* batch = streamStateCreateBatch();
|
||||
|
||||
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
|
||||
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tstreamFileState.h"
|
||||
|
||||
#include "query.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "taos.h"
|
||||
#include "tcommon.h"
|
||||
|
@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
|
|||
clearExpiredRowBuff(pFileState, 0, true);
|
||||
}
|
||||
|
||||
bool needClearDiskBuff(SStreamFileState* pFileState) {
|
||||
return pFileState->flushMark > 0;
|
||||
}
|
||||
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
|
||||
|
||||
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
|
||||
uint64_t i = 0;
|
||||
|
@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
|
|||
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
|
||||
|
||||
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
||||
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
|
||||
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
|
||||
: pFileState->maxTs - pFileState->deleteMark;
|
||||
clearExpiredRowBuff(pFileState, mark, false);
|
||||
return pFileState->usedBuffs;
|
||||
}
|
||||
|
||||
|
@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
|||
}
|
||||
|
||||
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
|
||||
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
|
||||
}
|
||||
if (streamStateGetBatchSize(batch) > 0) {
|
||||
|
@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
|||
int32_t len = 0;
|
||||
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
||||
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
|
||||
taosMemoryFree(valBuf);
|
||||
}
|
||||
{
|
||||
|
@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
|||
int32_t len = 0;
|
||||
memcpy(keyBuf, taskKey, strlen(taskKey));
|
||||
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
|
||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
|
||||
}
|
||||
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||
}
|
||||
|
@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
|
|||
|
||||
int32_t recoverSnapshot(SStreamFileState* pFileState) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
|
||||
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
|
||||
: pFileState->maxTs - pFileState->deleteMark;
|
||||
deleteExpiredCheckPoint(pFileState, mark);
|
||||
void* pStVal = NULL;
|
||||
int32_t len = 0;
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
|
|||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
streamUpdateTest
|
||||
PUBLIC os util common gtest stream
|
||||
PUBLIC os util common gtest gtest_main stream
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
|
|
|
@ -1,11 +1,28 @@
|
|||
#include <gtest/gtest.h>
|
||||
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "tstream.h"
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
||||
using namespace std;
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
|
||||
class StreamStateEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
streamMetaInit();
|
||||
backend = streamBackendInit(path);
|
||||
}
|
||||
virtual void TearDown() {
|
||||
streamMetaCleanup();
|
||||
// indexClose(index);
|
||||
}
|
||||
|
||||
const char *path = TD_TMP_DIR_PATH "stream";
|
||||
void *backend;
|
||||
};
|
||||
|
||||
bool equalSBF(SScalableBf *left, SScalableBf *right) {
|
||||
if (left->growth != right->growth) return false;
|
||||
if (left->numBits != right->numBits) return false;
|
||||
|
@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
|
|||
// updateInfoDestroy(pSU6);
|
||||
// updateInfoDestroy(pSU7);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
// TEST()
|
||||
TEST(StreamStateEnv, test1) {}
|
||||
// int main(int argc, char *argv[]) {
|
||||
// testing::InitGoogleTest(&argc, argv);
|
||||
// return RUN_ALL_TESTS();
|
||||
// }
|
|
@ -657,36 +657,33 @@ if $data20 != null then
|
|||
return -1
|
||||
endi
|
||||
|
||||
#print =============== error for normal table
|
||||
#sql create table tb2023(ts timestamp, f int);
|
||||
#sql_error alter table tb2023 add column v varchar(65535);
|
||||
#sql_error alter table tb2023 add column v varchar(65535);
|
||||
#sql_error alter table tb2023 add column v varchar(65530);
|
||||
#sql alter table tb2023 add column v varchar(16374);
|
||||
#sql_error alter table tb2023 modify column v varchar(65536);
|
||||
#sql desc tb2023
|
||||
#sql alter table tb2023 drop column v
|
||||
#sql_error alter table tb2023 add column v nchar(16384);
|
||||
#sql alter table tb2023 add column v nchar(4093);
|
||||
#sql_error alter table tb2023 modify column v nchar(16384);
|
||||
#sql_error alter table tb2023 add column v nchar(16384);
|
||||
#sql alter table tb2023 drop column v
|
||||
#sql alter table tb2023 add column v nchar(16374);
|
||||
#sql desc tb2023
|
||||
#
|
||||
#print =============== error for super table
|
||||
#sql create table stb2023(ts timestamp, f int) tags(t1 int);
|
||||
#sql_error alter table stb2023 add column v varchar(65535);
|
||||
#sql_error alter table stb2023 add column v varchar(65536);
|
||||
#sql_error alter table stb2023 add column v varchar(33100);
|
||||
#sql alter table stb2023 add column v varchar(16374);
|
||||
#sql_error alter table stb2023 modify column v varchar(16375);
|
||||
#sql desc stb2023
|
||||
#sql alter table stb2023 drop column v
|
||||
#sql_error alter table stb2023 add column v nchar(4094);
|
||||
#sql alter table stb2023 add column v nchar(4093);
|
||||
#sql_error alter table stb2023 modify column v nchar(4094);
|
||||
#sql desc stb2023
|
||||
print =============== error for normal table
|
||||
sql create table tb2023(ts timestamp, f int);
|
||||
sql_error alter table tb2023 add column v varchar(65518);
|
||||
sql_error alter table tb2023 add column v varchar(65531);
|
||||
sql_error alter table tb2023 add column v varchar(65535);
|
||||
sql alter table tb2023 add column v varchar(65517);
|
||||
sql_error alter table tb2023 modify column v varchar(65518);
|
||||
sql desc tb2023
|
||||
sql alter table tb2023 drop column v
|
||||
sql_error alter table tb2023 add column v nchar(16380);
|
||||
sql alter table tb2023 add column v nchar(16379);
|
||||
sql_error alter table tb2023 modify column v nchar(16380);
|
||||
sql desc tb2023
|
||||
|
||||
print =============== error for super table
|
||||
sql create table stb2023(ts timestamp, f int) tags(t1 int);
|
||||
sql_error alter table stb2023 add column v varchar(65518);
|
||||
sql_error alter table stb2023 add column v varchar(65531);
|
||||
sql_error alter table stb2023 add column v varchar(65535);
|
||||
sql alter table stb2023 add column v varchar(65517);
|
||||
sql_error alter table stb2023 modify column v varchar(65518);
|
||||
sql desc stb2023
|
||||
sql alter table stb2023 drop column v
|
||||
sql_error alter table stb2023 add column v nchar(16380);
|
||||
sql alter table stb2023 add column v nchar(16379);
|
||||
sql_error alter table stb2023 modify column v nchar(16380);
|
||||
sql desc stb2023
|
||||
|
||||
print ======= over
|
||||
sql drop database d1
|
||||
|
|
|
@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
|
|||
sql_error alter table tb modify column c2 binary(9);
|
||||
sql_error alter table tb modify column c2 binary(-9);
|
||||
sql_error alter table tb modify column c2 binary(0);
|
||||
sql_error alter table tb modify column c2 binary(65600);
|
||||
sql_error alter table tb modify column c2 binary(65436);
|
||||
sql_error alter table tb modify column c2 nchar(30);
|
||||
sql_error alter table tb modify column c3 double;
|
||||
sql_error alter table tb modify column c3 nchar(10);
|
||||
|
|
Loading…
Reference in New Issue