Merge branch '3.0' into feature/TD-11274-3.0

This commit is contained in:
Cary Xu 2022-08-03 10:58:15 +08:00
commit f887dd01b4
32 changed files with 2287 additions and 347 deletions

View File

@ -176,18 +176,6 @@ Note: 由于 SHOW 语句已经被开发者熟悉和广泛使用,所以它们
| 5 | tag_type | BINARY(64) | tag 的类型 | | 5 | tag_type | BINARY(64) | tag 的类型 |
| 6 | tag_value | BINARY(16384) | tag 的值 | | 6 | tag_value | BINARY(16384) | tag 的值 |
## USER_STREAMS
提供用户创建的流计算的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | --------------------------- |
| 1 | stream_name | BINARY(192) | 流计算名称 |
| 2 | user_name | BINARY(23) | 创建流计算的用户 |
| 3 | dest_table | BINARY(192) | 流计算写入的目标表 |
| 4 | create_time | TIMESTAMP | 创建时间 |
| 5 | sql | BLOB | 创建流计算时提供的 SQL 语句 |
## INS_USERS ## INS_USERS
提供系统中创建的用户的相关信息。 提供系统中创建的用户的相关信息。
@ -200,27 +188,44 @@ Note: 由于 SHOW 语句已经被开发者熟悉和广泛使用,所以它们
## INS_GRANTS ## INS_GRANTS
TODO 提供企业版授权的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------------------------------------------------- |
| 1 | version | BINARY(9) | 企业版授权说明official(官方授权的)/trial(试用的) |
| 2 | cpu_cores | BINARY(9) | 授权使用的 CPU 核心数量 |
| 3 | dnodes | BINARY(10) | 授权使用的 dnode 节点数量 |
| 4 | streams | BINARY(10) | 授权创建的流数量 |
| 5 | users | BINARY(10) | 授权创建的用户数量 |
| 6 | accounts | BINARY(10) | 授权创建的帐户数量 |
| 7 | storage | BINARY(21) | 授权使用的存储空间大小 |
| 8 | connections | BINARY(21) | 授权使用的客户端连接数量 |
| 9 | databases | BINARY(11) | 授权使用的数据库数量 |
| 10 | speed | BINARY(9) | 授权使用的数据点每秒写入数量 |
| 11 | querytime | BINARY(9) | 授权使用的查询总时长 |
| 12 | timeseries | BINARY(21) | 授权使用的测点数量 |
| 13 | expired | BINARY(5) | 是否到期true到期false未到期 |
| 14 | expire_time | BINARY(19) | 试用期到期时间 |
## INS_VGROUPS ## INS_VGROUPS
系统中所有 vgroups 的信息。 系统中所有 vgroups 的信息。
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-------: | ------------ | ---------------------------- | | --- | :-------: | ------------ | ------------------------------------------------------ |
| 1 | vgroup_id | INT | vgroup id | | 1 | vgroup_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 | | 2 | db_name | BINARY(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表 | | 3 | tables | INT | 此 vgroup 内有多少表 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 | | 4 | status | BINARY(10) | 此 vgroup 的状态 |
| 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id | | 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 6 | v1_status | BINARY(10) | 第一个成员的状态 | | 6 | v1_status | BINARY(10) | 第一个成员的状态 |
| 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id | | 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 8 | v2_status | BINARY(10) | 第二个成员的状态 | | 8 | v2_status | BINARY(10) | 第二个成员的状态 |
| 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id | | 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 10 | v3_status | BINARY(10) | 第三个成员的状态 | | 10 | v3_status | BINARY(10) | 第三个成员的状态 |
| 11 | nfiles | INT | TODO | | 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 |
| 12 | file_size | INT | TODO | | 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 |
| 13 | tsma | TINYINT | TODO | | 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA1: 是, 0: 否 |
## INS_CONFIGS ## INS_CONFIGS

View File

@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant);
#ifndef GRANTS_CFG #ifndef GRANTS_CFG
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \ #define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant);
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
} }
#define GRANT_CFG_ADD #define GRANT_CFG_ADD

View File

@ -479,7 +479,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
@ -487,6 +487,18 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
typedef struct SStreamMeta SStreamMeta;
SStreamMeta* streamMetaOpen();
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

0
packaging/docker/bin/entrypoint.sh Normal file → Executable file
View File

0
packaging/docker/bin/env-to-cfg Normal file → Executable file
View File

0
packaging/docker/bin/taos-check Normal file → Executable file
View File

0
packaging/docker/dockerManifest.sh Normal file → Executable file
View File

0
packaging/docker/dockerbuild.sh Normal file → Executable file
View File

0
packaging/docker/dockerbuildi.sh Normal file → Executable file
View File

View File

@ -165,7 +165,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
break; break;
case STREAM_QUEUE: case STREAM_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pStreamQ, pMsg); if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg);
} else {
taosWriteQitem(pVnode->pStreamQ, pMsg);
}
break; break;
case FETCH_QUEUE: case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);

View File

@ -1011,6 +1011,11 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
code = -1;
goto _OVER;
}
if (isAlter) { if (isAlter) {
bool needRsp = false; bool needRsp = false;
SStbObj pDst = {0}; SStbObj pDst = {0};

View File

@ -30,15 +30,15 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SSnode {
SMsgCb msgCb;
} SSnode;
#if 0
typedef struct { typedef struct {
SHashObj* pHash; // taskId -> SStreamTask SHashObj* pHash; // taskId -> SStreamTask
} SStreamMeta; } SStreamMeta;
typedef struct SSnode {
SStreamMeta* pMeta;
SMsgCb msgCb;
} SSnode;
SStreamMeta* sndMetaNew(); SStreamMeta* sndMetaNew();
void sndMetaDelete(SStreamMeta* pMeta); void sndMetaDelete(SStreamMeta* pMeta);
@ -49,6 +49,7 @@ int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -16,6 +16,10 @@
#include "executor.h" #include "executor.h"
#include "sndInt.h" #include "sndInt.h"
#include "tuuid.h" #include "tuuid.h"
/*SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; }*/
/*void sndClose(SSnode *pSnode) {}*/
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
@ -23,21 +27,24 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
return NULL; return NULL;
} }
pSnode->msgCb = pOption->msgCb; pSnode->msgCb = pOption->msgCb;
#if 0
pSnode->pMeta = sndMetaNew(); pSnode->pMeta = sndMetaNew();
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
taosMemoryFree(pSnode); taosMemoryFree(pSnode);
return NULL; return NULL;
} }
#endif
return pSnode; return pSnode;
} }
void sndClose(SSnode *pSnode) { void sndClose(SSnode *pSnode) {
sndMetaDelete(pSnode->pMeta); /*sndMetaDelete(pSnode->pMeta);*/
taosMemoryFree(pSnode); taosMemoryFree(pSnode);
} }
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
#if 0
SStreamMeta *sndMetaNew() { SStreamMeta *sndMetaNew() {
SStreamMeta *pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta *pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
@ -151,7 +158,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp); streamProcessDispatchReq(pTask, &req, &rsp, true);
return 0; return 0;
} }
@ -263,3 +270,4 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
} }
return 0; return 0;
} }
#endif

View File

@ -188,6 +188,8 @@ bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);

View File

@ -115,15 +115,23 @@ typedef struct {
} STqHandle; } STqHandle;
struct STQ { struct STQ {
char* path; SVnode* pVnode;
SHashObj* pushMgr; // consumerId -> STqHandle* char* path;
SHashObj* handles; // subKey -> STqHandle SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* pStreamTasks; // taksId -> SStreamTask SHashObj* handles; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo SHashObj* pStreamTasks; // taksId -> SStreamTask
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
SVnode* pVnode;
TDB* pMetaStore; TDB* pMetaStore;
TTB* pExecStore; TTB* pExecStore;
TTB* pAlterInfoStore;
TDB* pStreamStore;
TTB* pTaskDb;
TTB* pTaskState;
}; };
typedef struct { typedef struct {

View File

@ -163,7 +163,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -280,8 +280,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return 0; return 0;
} }
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
@ -386,6 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
ASSERT(fetchOffsetNew.type == TMQ_OFFSET__LOG);
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
@ -461,22 +460,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
goto OVER; goto OVER;
} }
} }
taosMemoryFree(pCkHead);
#if 0
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0);
}
// 4. send rsp
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
#endif
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
} }
OVER: OVER:
@ -614,17 +597,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); int32_t code = 0;
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1); ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) { if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
@ -634,11 +608,15 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
code = -1;
goto FAIL;
}
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
// exec // exec
@ -683,15 +661,35 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId); pTask->selfChildId);
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
/*SMeta* pMeta = pTq->pVnode->pMeta;*/
/*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/
return 0;
FAIL: FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
// TODO free executor
return code;
}
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (tqExpandTask(pTq, pTask) < 0) {
goto FAIL;
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask); if (pTask) taosMemoryFree(pTask);
return -1; return -1;
} }
@ -752,7 +750,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -767,7 +765,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(*ppTask, &req, &rsp); streamProcessDispatchReq(*ppTask, &req, &rsp, exec);
return 0; return 0;
} else { } else {
return -1; return -1;
@ -825,16 +823,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
// launch exec to free memory // launch exec to free memory
// remove from hash // remove from hash
return 0; return 0;
#if 0
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
// set status dropping
ASSERT(code == 0);
if (code == 0) {
// sendrsp
}
return code;
#endif
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
@ -863,3 +851,37 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
// //
return 0; return 0;
} }
void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq;
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamDispatchReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
goto FAIL;
}
int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(*ppTask, &req, &rsp, false);
return;
}
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
};
tmsgSendRsp(&rsp);
}

View File

@ -0,0 +1,209 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqSnapReader ========================================
struct STqSnapReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
// alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
STqHandle handle;
*ppData = NULL;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
}
}
ASSERT(pVal && vLen);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
_exit:
return code;
_err:
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
// STqSnapWriter ========================================
struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
int32_t code = 0;
STqSnapWriter* pWriter;
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
}
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
ASSERT(0);
} else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) {
goto _err;
}
return code;
_err:
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};
SDecoder* pDecoder = &decoder;
STqHandle handle;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
if (code < 0) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -0,0 +1,209 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqSnapReader ========================================
struct STqSnapReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
// alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
STqHandle handle;
*ppData = NULL;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
}
}
ASSERT(pVal && vLen);
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
_exit:
return code;
_err:
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
// STqSnapWriter ========================================
struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
int32_t code = 0;
STqSnapWriter* pWriter;
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
}
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
ASSERT(0);
} else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) {
goto _err;
}
return code;
_err:
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};
SDecoder* pDecoder = &decoder;
STqHandle handle;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
if (code < 0) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -330,7 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER: case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE: case TDMT_STREAM_RETRIEVE:
@ -490,6 +490,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
goto _exit; goto _exit;
} }
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
rcode = -1;
goto _exit;
}
// validate hash // validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) { if (vnodeValidateTableHash(pVnode, tbName) < 0) {
@ -840,6 +845,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit; goto _exit;
} }
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno; submitBlkRsp.code = terrno;

View File

@ -325,16 +325,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return -1; return -1;
} }
#if 1 vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
do {
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
}
taosMemoryFree(syncNodeStr);
} while (0);
#endif
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
@ -458,6 +449,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
} }
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
if (code != 0 && terrno == 0) { if (code != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;

View File

@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
stream stream
PRIVATE os util transport qcom executor PRIVATE os util transport qcom executor tdb
) )
if(${BUILD_TEST}) if(${BUILD_TEST})

View File

@ -175,40 +175,21 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
pReq->upstreamTaskId); pReq->upstreamTaskId);
// 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
// 2. try exec if (exec) {
// 2.1. idle: exec streamExec(pTask);
// 2.2. executing: return
// 2.3. closing: keep trying
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
#endif
streamExec(pTask);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
while (1) {
void* data = streamQueueNextItem(pTask->inputQueue);
if (data == NULL) return 0;
if (streamTaskOutput(pTask, data) < 0) {
ASSERT(0);
}
}
}
#endif
// 3. handle output if (pTask->dispatchType != TASK_DISPATCH__NONE) {
// 3.1 check and set status ASSERT(pTask->sinkType == TASK_SINK__NONE);
// 3.2 dispatch / sink streamDispatch(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { }
ASSERT(pTask->sinkType == TASK_SINK__NONE); } else {
streamDispatch(pTask); streamLaunchByWrite(pTask, pTask->nodeId);
} }
return 0; return 0;

View File

@ -0,0 +1,159 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "tdbInt.h"
#include "tstream.h"
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pMeta->path = strdup(path);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
goto _err;
}
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
_err:
return NULL;
}
void streamMetaClose(SStreamMeta* pMeta) {
//
return;
}
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
int32_t len;
int32_t code;
tEncodeSize(tEncodeSStreamTask, pTask, len, code);
if (code < 0) {
return -1;
}
buf = taosMemoryCalloc(1, sizeof(len));
if (buf == NULL) {
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, len);
tEncodeSStreamTask(&encoder, pTask);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
ASSERT(0);
return -1;
}
return 0;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
}
return 0;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaRollBack(SStreamMeta* pMeta) {
// TODO tdb rollback
return 0;
}
int32_t streamRestoreTask(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
}
return 0;
}

View File

@ -730,7 +730,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
for (int i = 0; i < arrSize; ++i) { for (int i = 0; i < arrSize; ++i) {
do { do {
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose type:%s, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize); snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
} while (0); } while (0);
@ -790,7 +790,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
do { do {
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose type:%s", TMSG_INFO(pMsg->msgType)); snprintf(eventLog, sizeof(eventLog), "propose message, type:%s", TMSG_INFO(pMsg->msgType));
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
} while (0); } while (0);
@ -1894,7 +1894,9 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
#if 0 // simon
syncNodeReplicate(pSyncNode); syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
} else { } else {
@ -2070,7 +2072,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
#if 0 // simon
syncNodeReplicate(pSyncNode); syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
} }

View File

@ -50,8 +50,8 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "save response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", snprintf(eventLog, sizeof(eventLog), "save message handle, type:%s seq:%" PRIu64 " handle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
@ -76,8 +76,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "get response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", snprintf(eventLog, sizeof(eventLog), "get message handle, type:%s seq:%" PRIu64 " handle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
@ -96,8 +96,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "get-and-del response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p", snprintf(eventLog, sizeof(eventLog), "get-and-del message handle, type:%s seq:%" PRIu64 " handle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosHashRemove(pObj->pRespHash, &index, sizeof(index));

View File

@ -410,6 +410,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SPEED_LIMITED, "Write speed limited b
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, "Storage capacity limited by license") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, "Storage capacity limited by license")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, "Query time limited by license") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, "Query time limited by license")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, "CPU cores limited by license") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, "CPU cores limited by license")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STABLE_LIMITED, "STable creation limited by license")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TABLE_LIMITED, "Table creation limited by license")
// sync // sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")

View File

@ -35,7 +35,7 @@ class TDTestCase:
else: else:
return True return True
def getBuildPath(self): def getPath(self, tool="taosdump"):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if ("community" in selfPath):
@ -43,25 +43,33 @@ class TDTestCase:
else: else:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosdump" in files): if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] paths.append(os.path.join(root, tool))
break break
return buildPath if (len(paths) == 0):
return ""
return paths[0]
def run(self): def run(self):
if not os.path.exists("./taosdumptest/tmp1"): if not os.path.exists("./taosdumptest/tmp1"):
os.makedirs("./taosdumptest/tmp1") os.makedirs("./taosdumptest/tmp1")
else: else:
print("目录存在") os.system("rm -rf ./taosdumptest/tmp1")
os.makedirs("./taosdumptest/tmp1")
if not os.path.exists("./taosdumptest/tmp2"): if not os.path.exists("./taosdumptest/tmp2"):
os.makedirs("./taosdumptest/tmp2") os.makedirs("./taosdumptest/tmp2")
else:
os.system("rm -rf ./taosdumptest/tmp2")
os.makedirs("./taosdumptest/tmp2")
tdSql.execute("drop database if exists db") tdSql.execute("drop database if exists db")
tdSql.execute("create database db duration 11 keep 3649 blocks 8 ") tdSql.execute("create database db duration 11 keep 3649")
tdSql.execute("create database db1 duration 12 keep 3640 blocks 7 ") tdSql.execute("create database db1 duration 12 keep 3640")
tdSql.execute("use db") tdSql.execute("use db")
tdSql.execute( tdSql.execute(
"create table st(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))") "create table st(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))")
@ -78,31 +86,30 @@ class TDTestCase:
sql += "(%d, %d, 'nchar%d')" % (currts + i, i % 100, i % 100) sql += "(%d, %d, 'nchar%d')" % (currts + i, i % 100, i % 100)
tdSql.execute(sql) tdSql.execute(sql)
buildPath = self.getBuildPath() binPath = self.getPath()
if (buildPath == ""): if (binPath == ""):
tdLog.exit("taosdump not found!") tdLog.exit("taosdump not found!")
else: else:
tdLog.info("taosdump found in %s" % buildPath) tdLog.info("taosdump found: %s" % binPath)
binPath = buildPath + "/build/bin/"
os.system("%staosdump --databases db -o ./taosdumptest/tmp1" % binPath) os.system("%s -y --databases db -o ./taosdumptest/tmp1" % binPath)
os.system( os.system(
"%staosdump --databases db1 -o ./taosdumptest/tmp2" % "%s -y --databases db1 -o ./taosdumptest/tmp2" %
binPath) binPath)
tdSql.execute("drop database db") tdSql.execute("drop database db")
tdSql.execute("drop database db1") tdSql.execute("drop database db1")
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkRows(0) tdSql.checkRows(2)
os.system("%staosdump -i ./taosdumptest/tmp1" % binPath) os.system("%s -i ./taosdumptest/tmp1" % binPath)
os.system("%staosdump -i ./taosdumptest/tmp2" % binPath) os.system("%s -i ./taosdumptest/tmp2" % binPath)
tdSql.execute("use db") tdSql.execute("use db")
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkRows(2) tdSql.checkRows(4)
dbresult = tdSql.queryResult dbresult = tdSql.queryResult
# 6--duration,7--keep0,keep1,keep, 12--block, # 6--duration,7--keep0,keep1,keep
isCommunity = self.checkCommunity() isCommunity = self.checkCommunity()
print("iscommunity: %d" % isCommunity) print("iscommunity: %d" % isCommunity)
@ -111,20 +118,15 @@ class TDTestCase:
print(dbresult[i]) print(dbresult[i])
print(type(dbresult[i][6])) print(type(dbresult[i][6]))
print(type(dbresult[i][7])) print(type(dbresult[i][7]))
print(type(dbresult[i][9])) print((dbresult[i][6]))
assert dbresult[i][6] == 11 assert dbresult[i][6] == "15840m"
if isCommunity: print((dbresult[i][7]))
assert dbresult[i][7] == "3649" assert dbresult[i][7] == "5254560m,5254560m,5254560m"
else:
assert dbresult[i][7] == "3649,3649,3649"
assert dbresult[i][9] == 8
if dbresult[i][0] == 'db1': if dbresult[i][0] == 'db1':
assert dbresult[i][6] == 12 print((dbresult[i][6]))
if isCommunity: assert dbresult[i][6] == "17280m"
assert dbresult[i][7] == "3640" print((dbresult[i][7]))
else: assert dbresult[i][7] == "5241600m,5241600m,5241600m"
assert dbresult[i][7] == "3640,3640,3640"
assert dbresult[i][9] == 7
tdSql.query("show stables") tdSql.query("show stables")
tdSql.checkRows(1) tdSql.checkRows(1)
@ -132,8 +134,10 @@ class TDTestCase:
tdSql.query("show tables") tdSql.query("show tables")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 0, 't2') dbresult = tdSql.queryResult
tdSql.checkData(1, 0, 't1') print(dbresult)
for i in range(len(dbresult)):
assert ((dbresult[i][0] == "t1") or (dbresult[i][0] == "t2"))
tdSql.query("select * from t1") tdSql.query("select * from t1")
tdSql.checkRows(100) tdSql.checkRows(100)
@ -155,7 +159,7 @@ class TDTestCase:
os.system("rm -rf ./taosdumptest/tmp2") os.system("rm -rf ./taosdumptest/tmp2")
os.makedirs("./taosdumptest/tmp1") os.makedirs("./taosdumptest/tmp1")
tdSql.execute("create database db12312313231231321312312312_323") tdSql.execute("create database db12312313231231321312312312_323")
tdSql.error("create database db12312313231231321312312312_3231") tdSql.error("create database db012345678911234567892234567893323456789423456789523456789bcdefe")
tdSql.execute("use db12312313231231321312312312_323") tdSql.execute("use db12312313231231321312312312_323")
tdSql.execute("create stable st12345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678_9(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))") tdSql.execute("create stable st12345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678_9(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))")
tdSql.error("create stable st_12345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678_9(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))") tdSql.error("create stable st_12345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678912345678_9(ts timestamp, c1 int, c2 nchar(10)) tags(t1 int, t2 binary(10))")
@ -168,9 +172,10 @@ class TDTestCase:
tdSql.query("show stables") tdSql.query("show stables")
tdSql.checkRows(2) tdSql.checkRows(2)
os.system( os.system(
"%staosdump --databases db12312313231231321312312312_323 -o ./taosdumptest/tmp1" % binPath) "%s -y --databases db12312313231231321312312312_323 -o ./taosdumptest/tmp1" %
binPath)
tdSql.execute("drop database db12312313231231321312312312_323") tdSql.execute("drop database db12312313231231321312312312_323")
os.system("%staosdump -i ./taosdumptest/tmp1" % binPath) os.system("%s -i ./taosdumptest/tmp1" % binPath)
tdSql.execute("use db12312313231231321312312312_323") tdSql.execute("use db12312313231231321312312312_323")
tdSql.query("show stables") tdSql.query("show stables")
tdSql.checkRows(2) tdSql.checkRows(2)

View File

@ -26,9 +26,9 @@ class TDTestCase:
self.ts = 1601481600000 self.ts = 1601481600000
self.numberOfTables = 1 self.numberOfTables = 1
self.numberOfRecords = 15000 self.numberOfRecords = 150
def getBuildPath(self): def getPath(self, tool="taosdump"):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if ("community" in selfPath):
@ -36,15 +36,24 @@ class TDTestCase:
else: else:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] paths.append(os.path.join(root, tool))
break break
return buildPath if (len(paths) == 0):
return ""
return paths[0]
def run(self): def run(self):
if not os.path.exists("./taosdumptest/tmp"):
os.makedirs("./taosdumptest/tmp")
else:
os.system("rm -rf ./taosdumptest/tmp")
os.makedirs("./taosdumptest/tmp")
tdSql.prepare() tdSql.prepare()
tdSql.execute("create table st(ts timestamp, c1 timestamp, c2 int, c3 bigint, c4 float, c5 double, c6 binary(8), c7 smallint, c8 tinyint, c9 bool, c10 nchar(8)) tags(t1 int)") tdSql.execute("create table st(ts timestamp, c1 timestamp, c2 int, c3 bigint, c4 float, c5 double, c6 binary(8), c7 smallint, c8 tinyint, c9 bool, c10 nchar(8)) tags(t1 int)")
@ -60,27 +69,26 @@ class TDTestCase:
break break
tdSql.execute(sql) tdSql.execute(sql)
buildPath = self.getBuildPath() binPath = self.getPath()
if (buildPath == ""): if (binPath == ""):
tdLog.exit("taosdump not found!") tdLog.exit("taosdump not found!")
else: else:
tdLog.info("taosdump found in %s" % buildPath) tdLog.info("taosdump found in %s" % binPath)
binPath = buildPath + "/build/bin/"
os.system("rm /tmp/*.sql") os.system("rm ./taosdumptest/tmp/*.sql")
os.system( os.system(
"%staosdump --databases db -o /tmp -B 32766 -L 1048576" % "%s --databases db -o ./taosdumptest/tmp -B 32766 -L 1048576" %
binPath) binPath)
tdSql.execute("drop database db") tdSql.execute("drop database db")
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkRows(0) tdSql.checkRows(2)
os.system("%staosdump -i /tmp" % binPath) os.system("%s -i ./taosdumptest/tmp" % binPath)
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkRows(1) tdSql.checkRows(3)
tdSql.checkData(0, 0, 'db') tdSql.checkData(2, 0, 'db')
tdSql.execute("use db") tdSql.execute("use db")
tdSql.query("show stables") tdSql.query("show stables")
@ -90,6 +98,38 @@ class TDTestCase:
tdSql.query("select count(*) from t1") tdSql.query("select count(*) from t1")
tdSql.checkData(0, 0, self.numberOfRecords) tdSql.checkData(0, 0, self.numberOfRecords)
# test case for TS-1225
tdSql.execute("create database test")
tdSql.execute("use test")
tdSql.execute(
"create table stb(ts timestamp, c1 binary(16374), c2 binary(16374), c3 binary(16374)) tags(t1 nchar(256))")
tdSql.execute(
"insert into t1 using stb tags('t1') values(now, '%s', '%s', '%s')" %
("16374",
"16374",
"16374"))
# sys.exit(0)
os.system("rm ./taosdumptest/tmp/*.sql")
os.system("rm ./taosdumptest/tmp/*.avro*")
os.system("%s -D test -o ./taosdumptest/tmp -y" % binPath)
tdSql.execute("drop database test")
tdSql.query("show databases")
tdSql.checkRows(3)
os.system("%s -i ./taosdumptest/tmp -y" % binPath)
tdSql.execute("use test")
tdSql.error("show vnodes '' ")
tdSql.query("show stables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 'stb')
tdSql.query("select * from stb")
tdSql.checkRows(1)
os.system("rm -rf dump_result.txt")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)

View File

@ -35,7 +35,7 @@ class TDTestCase:
else: else:
return True return True
def getBuildPath(self): def getPath(self, tool="taosdump"):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if ("community" in selfPath):
@ -43,15 +43,16 @@ class TDTestCase:
else: else:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] paths.append(os.path.join(root, tool))
break break
return buildPath if (len(paths) == 0):
return ""
return paths[0]
def createdb(self, precision="ns"): def createdb(self, precision="ns"):
tb_nums = self.numberOfTables tb_nums = self.numberOfTables
@ -60,13 +61,16 @@ class TDTestCase:
def build_db(precision, start_time): def build_db(precision, start_time):
tdSql.execute("drop database if exists timedb1") tdSql.execute("drop database if exists timedb1")
tdSql.execute( tdSql.execute(
"create database timedb1 duration 10 keep 36500 blocks 8 precision "+"\""+precision+"\"") "create database timedb1 duration 10 keep 36500 precision " +
"\"" +
precision +
"\"")
tdSql.execute("use timedb1") tdSql.execute("use timedb1")
tdSql.execute( tdSql.execute(
"create stable st(ts timestamp, c1 int, c2 nchar(10),c3 timestamp) tags(t1 int, t2 binary(10))") "create stable st(ts timestamp, c1 int, c2 nchar(10),c3 timestamp) tags(t1 int, t2 binary(10))")
for tb in range(tb_nums): for tb in range(tb_nums):
tbname = "t"+str(tb) tbname = "t" + str(tb)
tdSql.execute("create table " + tbname + tdSql.execute("create table " + tbname +
" using st tags(1, 'beijing')") " using st tags(1, 'beijing')")
sql = "insert into " + tbname + " values" sql = "insert into " + tbname + " values"
@ -79,8 +83,8 @@ class TDTestCase:
ts_seed = 1000 ts_seed = 1000
for i in range(per_tb_rows): for i in range(per_tb_rows):
sql += "(%d, %d, 'nchar%d',%d)" % (currts + i*ts_seed, i % sql += "(%d, %d, 'nchar%d',%d)" % (currts + i * ts_seed, i %
100, i % 100, currts + i*100) # currts +1000ms (1000000000ns) 100, i % 100, currts + i * 100) # currts +1000ms (1000000000ns)
tdSql.execute(sql) tdSql.execute(sql)
if precision == "ns": if precision == "ns":
@ -97,7 +101,6 @@ class TDTestCase:
else: else:
print("other time precision not valid , please check! ") print("other time precision not valid , please check! ")
def run(self): def run(self):
@ -118,12 +121,11 @@ class TDTestCase:
if not os.path.exists("./taosdumptest/dumptmp3"): if not os.path.exists("./taosdumptest/dumptmp3"):
os.makedirs("./taosdumptest/dumptmp3") os.makedirs("./taosdumptest/dumptmp3")
buildPath = self.getBuildPath() binPath = self.getPath("taosdump")
if (buildPath == ""): if (binPath == ""):
tdLog.exit("taosdump not found!") tdLog.exit("taosdump not found!")
else: else:
tdLog.info("taosdump found in %s" % buildPath) tdLog.info("taosdump found: %s" % binPath)
binPath = buildPath + "/build/bin/"
# create nano second database # create nano second database
@ -132,67 +134,51 @@ class TDTestCase:
# dump all data # dump all data
os.system( os.system(
"%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath) "%s -y -g --databases timedb1 -o ./taosdumptest/dumptmp1" %
binPath)
# dump part data with -S -E # dump part data with -S -E
os.system( os.system(
'%staosdump --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -o ./taosdumptest/dumptmp2 ' % '%s -y -g --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -o ./taosdumptest/dumptmp2 ' %
binPath) binPath)
os.system( os.system(
'%staosdump --databases timedb1 -S 1625068810000000000 -o ./taosdumptest/dumptmp3 ' % '%s -y -g --databases timedb1 -S 1625068810000000000 -o ./taosdumptest/dumptmp3 ' %
binPath) binPath)
# replace strings to dump in databases tdSql.execute("drop database timedb1")
os.system( os.system("%s -i ./taosdumptest/dumptmp2" % binPath)
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`")
os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`")
os.system(
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system( "%staosdump -i ./taosdumptest/dumptmp1" %binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp2" %binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp3" %binPath)
# dump data and check for taosdump # dump data and check for taosdump
tdSql.query("select count(*) from dumptmp1.st") tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0,0,1000) tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp2.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,510) os.system("%s -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
tdSql.query("select count(*) from dumptmp3.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,900) os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
# check data # check data
origin_res = tdSql.getResult("select * from timedb1.st") origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st") tdSql.execute("drop database timedb1")
os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res: if origin_res == dump_res:
tdLog.info("test nano second : dump check data pass for all data!" ) tdLog.info("test nano second : dump check data pass for all data!")
else: else:
tdLog.info("test nano second : dump check data failed for all data!" ) tdLog.info(
"test nano second : dump check data failed for all data!")
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000000000 and ts <= 1625068860000000000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test nano second : dump check data pass for data! " )
else:
tdLog.info(" test nano second : dump check data failed for data !" )
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000000000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
if origin_res == dump_res:
tdLog.info(" test nano second : dump check data pass for data! " )
else:
tdLog.info(" test nano second : dump check data failed for data !" )
# us second support test case # us second support test case
os.system("rm -rf ./taosdumptest/") os.system("rm -rf ./taosdumptest/")
tdSql.execute("drop database if exists dumptmp1") tdSql.execute("drop database if exists timedb1")
tdSql.execute("drop database if exists dumptmp2")
tdSql.execute("drop database if exists dumptmp3")
if not os.path.exists("./taosdumptest/tmp1"): if not os.path.exists("./taosdumptest/tmp1"):
os.makedirs("./taosdumptest/dumptmp1") os.makedirs("./taosdumptest/dumptmp1")
@ -205,75 +191,63 @@ class TDTestCase:
if not os.path.exists("./taosdumptest/dumptmp3"): if not os.path.exists("./taosdumptest/dumptmp3"):
os.makedirs("./taosdumptest/dumptmp3") os.makedirs("./taosdumptest/dumptmp3")
buildPath = self.getBuildPath() binPath = self.getPath()
if (buildPath == ""): if (binPath == ""):
tdLog.exit("taosdump not found!") tdLog.exit("taosdump not found!")
else: else:
tdLog.info("taosdump found in %s" % buildPath) tdLog.info("taosdump found: %s" % binPath)
binPath = buildPath + "/build/bin/"
self.createdb(precision="us") self.createdb(precision="us")
os.system( os.system(
"%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath) "%s -y -g --databases timedb1 -o ./taosdumptest/dumptmp1" %
os.system(
'%staosdump --databases timedb1 -S 1625068810000000 -E 1625068860000000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000000 -o ./taosdumptest/dumptmp3 ' %
binPath) binPath)
os.system( os.system(
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`") '%s -y -g --databases timedb1 -S 1625068810000000 -E 1625068860000000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system( os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`") '%s -y -g --databases timedb1 -S 1625068810000000 -o ./taosdumptest/dumptmp3 ' %
os.system( binPath)
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system( "%staosdump -i ./taosdumptest/dumptmp1" %binPath) os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp2" %binPath) os.system("%s -i ./taosdumptest/dumptmp2" % binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp3" %binPath) os.system("%s -i ./taosdumptest/dumptmp3" % binPath)
tdSql.execute("drop database timedb1")
tdSql.query("select count(*) from dumptmp1.st") os.system("%s -i ./taosdumptest/dumptmp2" % binPath)
tdSql.checkData(0,0,1000) # dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp2.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,510) os.system("%s -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
tdSql.query("select count(*) from dumptmp3.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,900) os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
# check data
origin_res = tdSql.getResult("select * from timedb1.st") origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st") tdSql.execute("drop database timedb1")
os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res: if origin_res == dump_res:
tdLog.info("test us second : dump check data pass for all data!" ) tdLog.info("test micro second : dump check data pass for all data!")
else: else:
tdLog.info("test us second : dump check data failed for all data!" ) tdLog.info(
"test micro second : dump check data failed for all data!")
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000000 and ts <= 1625068860000000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test us second : dump check data pass for data! " )
else:
tdLog.info(" test us second : dump check data failed for data!" )
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
if origin_res == dump_res:
tdLog.info(" test us second : dump check data pass for data! " )
else:
tdLog.info(" test us second : dump check data failed for data! " )
# ms second support test case # ms second support test case
os.system("rm -rf ./taosdumptest/") os.system("rm -rf ./taosdumptest/")
tdSql.execute("drop database if exists dumptmp1") tdSql.execute("drop database if exists timedb1")
tdSql.execute("drop database if exists dumptmp2")
tdSql.execute("drop database if exists dumptmp3")
if not os.path.exists("./taosdumptest/tmp1"): if not os.path.exists("./taosdumptest/tmp1"):
os.makedirs("./taosdumptest/dumptmp1") os.makedirs("./taosdumptest/dumptmp1")
@ -286,69 +260,60 @@ class TDTestCase:
if not os.path.exists("./taosdumptest/dumptmp3"): if not os.path.exists("./taosdumptest/dumptmp3"):
os.makedirs("./taosdumptest/dumptmp3") os.makedirs("./taosdumptest/dumptmp3")
buildPath = self.getBuildPath() binPath = self.getPath()
if (buildPath == ""): if (binPath == ""):
tdLog.exit("taosdump not found!") tdLog.exit("taosdump not found!")
else: else:
tdLog.info("taosdump found in %s" % buildPath) tdLog.info("taosdump found: %s" % binPath)
binPath = buildPath + "/build/bin/"
self.createdb(precision="ms") self.createdb(precision="ms")
os.system( os.system(
"%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath) "%s -y -g --databases timedb1 -o ./taosdumptest/dumptmp1" %
os.system(
'%staosdump --databases timedb1 -S 1625068810000 -E 1625068860000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000 -o ./taosdumptest/dumptmp3 ' %
binPath) binPath)
os.system( os.system(
"sed -i \"s/timedb1/dumptmp1/g\" `grep timedb1 -rl ./taosdumptest/dumptmp1`") '%s -y -g --databases timedb1 -S 1625068810000 -E 1625068860000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system( os.system(
"sed -i \"s/timedb1/dumptmp2/g\" `grep timedb1 -rl ./taosdumptest/dumptmp2`") '%s -y -g --databases timedb1 -S 1625068810000 -o ./taosdumptest/dumptmp3 ' %
os.system( binPath)
"sed -i \"s/timedb1/dumptmp3/g\" `grep timedb1 -rl ./taosdumptest/dumptmp3`")
os.system( "%staosdump -i ./taosdumptest/dumptmp1" %binPath) os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp2" %binPath) os.system("%s -i ./taosdumptest/dumptmp2" % binPath)
os.system( "%staosdump -i ./taosdumptest/dumptmp3" %binPath) os.system("%s -i ./taosdumptest/dumptmp3" % binPath)
tdSql.execute("drop database timedb1")
tdSql.query("select count(*) from dumptmp1.st") os.system("%s -i ./taosdumptest/dumptmp2" % binPath)
tdSql.checkData(0,0,1000) # dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 510)
tdSql.query("select count(*) from dumptmp2.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,510) os.system("%s -i ./taosdumptest/dumptmp3" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 900)
tdSql.query("select count(*) from dumptmp3.st") tdSql.execute("drop database timedb1")
tdSql.checkData(0,0,900) os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
tdSql.query("select count(*) from timedb1.st")
tdSql.checkData(0, 0, 1000)
# check data
origin_res = tdSql.getResult("select * from timedb1.st") origin_res = tdSql.getResult("select * from timedb1.st")
dump_res = tdSql.getResult("select * from dumptmp1.st") tdSql.execute("drop database timedb1")
os.system("%s -i ./taosdumptest/dumptmp1" % binPath)
# dump data and check for taosdump
dump_res = tdSql.getResult("select * from timedb1.st")
if origin_res == dump_res: if origin_res == dump_res:
tdLog.info("test ms second : dump check data pass for all data!" ) tdLog.info(
"test million second : dump check data pass for all data!")
else: else:
tdLog.info("test ms second : dump check data failed for all data!" ) tdLog.info(
"test million second : dump check data failed for all data!")
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000 and ts <= 1625068860000")
dump_res = tdSql.getResult("select * from dumptmp2.st")
if origin_res == dump_res:
tdLog.info(" test ms second : dump check data pass for data! " )
else:
tdLog.info(" test ms second : dump check data failed for data!" )
origin_res = tdSql.getResult("select * from timedb1.st where ts >=1625068810000 ")
dump_res = tdSql.getResult("select * from dumptmp3.st")
if origin_res == dump_res:
tdLog.info(" test ms second : dump check data pass for data! " )
else:
tdLog.info(" test ms second : dump check data failed for data! " )
os.system("rm -rf ./taosdumptest/") os.system("rm -rf ./taosdumptest/")
os.system("rm -rf ./dump_result.txt") os.system("rm -rf ./dump_result.txt")
os.system("rm -rf *.py.sql") os.system("rm -rf *.py.sql")

File diff suppressed because it is too large Load Diff

View File

@ -525,7 +525,11 @@ int32_t shellReadCommand(char *command) {
switch (c) { switch (c) {
case 'A': // Up arrow case 'A': // Up arrow
hist_counter = (hist_counter + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE; hist_counter = (hist_counter + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE;
shellResetCommand(&cmd, (pHistory->hist[hist_counter] == NULL) ? "" : pHistory->hist[hist_counter]); if (pHistory->hist[hist_counter] == NULL) {
hist_counter = (hist_counter + SHELL_MAX_HISTORY_SIZE + 1) % SHELL_MAX_HISTORY_SIZE;
} else {
shellResetCommand(&cmd, pHistory->hist[hist_counter]);
}
break; break;
case 'B': // Down arrow case 'B': // Down arrow
if (hist_counter != pHistory->hend) { if (hist_counter != pHistory->hend) {